diff --git a/forester/src/epoch_manager.rs b/forester/src/epoch_manager.rs index b7c06e3d65..fd565d6233 100644 --- a/forester/src/epoch_manager.rs +++ b/forester/src/epoch_manager.rs @@ -696,6 +696,17 @@ impl> EpochManager { ) .await?; + let light_slot_timeout = { + let slot_length_u32 = u32::try_from(epoch_pda.protocol_config.slot_length) + .map_err(|_| ForesterError::Custom("Slot length overflow".into()))?; + + slot_duration() + .checked_mul(slot_length_u32) + .ok_or_else(|| { + ForesterError::Custom("Timeout calculation overflow".into()) + })? + }; + // TODO: measure accuracy // Optional replace with shutdown signal for all child processes let config = SendBatchedTransactionsConfig { @@ -706,7 +717,7 @@ impl> EpochManager { compute_unit_limit: Some(1_000_000), }, retry_config: RetryConfig { - timeout: slot_duration() * epoch_pda.protocol_config.slot_length as u32, + timeout: light_slot_timeout, ..self.config.retry_config }, light_slot_length: epoch_pda.protocol_config.slot_length, diff --git a/forester/src/send_transaction.rs b/forester/src/send_transaction.rs index 3c6b5ded3e..054706015f 100644 --- a/forester/src/send_transaction.rs +++ b/forester/src/send_transaction.rs @@ -25,7 +25,8 @@ use solana_sdk::{ signature::{Keypair, Signer}, }; use std::sync::Arc; -use std::{time::Duration, vec}; +use std::time::Duration; +use std::vec; use tokio::join; use tokio::sync::Mutex; use tokio::time::{sleep, Instant}; @@ -39,9 +40,16 @@ pub trait TransactionBuilder { recent_blockhash: &Hash, work_items: &[WorkItem], config: BuildTransactionBatchConfig, - ) -> Vec; + ) -> Result>; } +// We're assuming that: +// 1. Helius slot latency is ~ 3 slots. +// See also: https://p.us5.datadoghq.com/sb/339e0590-c5d4-11ed-9c7b-da7ad0900005-231a672007c47d70f38e8fa321bc8407?fromUser=false&refresh_mode=sliding&tpl_var_leader_name%5B0%5D=%2A&from_ts=1725348612900&to_ts=1725953412900&live=true +// 2. Latency between forester server and helius is ~ 1 slot. +// 3. Slot duration is 500ms. +const LATENCY: Duration = Duration::from_millis(4 * 500); + /// Setting: /// 1. We have 1 light slot 15 seconds and a lot of elements in the queue /// 2. we want to send as many elements from the queue as possible @@ -112,7 +120,19 @@ pub async fn send_batched_transactions( work_items.chunks(config.build_transaction_batch_config.batch_size as usize) { // 6. Check if we reached the end of the light slot. - if (config.retry_config.timeout - start_time.elapsed()) < Duration::from_millis(20) { + let remaining_time = match config + .retry_config + .timeout + .checked_sub(start_time.elapsed()) + { + Some(time) => time, + None => { + debug!("Reached end of light slot"); + break; + } + }; + + if remaining_time < LATENCY { debug!("Reached end of light slot"); break; } @@ -127,7 +147,7 @@ pub async fn send_batched_transactions( work_items, config.build_transaction_batch_config, ) - .await; + .await?; debug!( "build transaction time {:?}", transaction_build_time_start.elapsed() @@ -139,6 +159,12 @@ pub async fn send_batched_transactions( .retry_config .timeout .saturating_sub(start_time.elapsed()); + + if remaining_time < LATENCY { + debug!("Reached end of light slot"); + break; + } + // Asynchronously send all transactions in the batch let send_futures = transactions.into_iter().map(move |tx| { let url = url.clone(); @@ -147,9 +173,8 @@ pub async fn send_batched_transactions( ..config.retry_config }; tokio::spawn(async move { - let mut rpc = - SolanaRpcConnection::new_with_retry(url, None, Some(retry_config)); - rpc.process_transaction(tx).await + let rpc = SolanaRpcConnection::new_with_retry(url, None, Some(retry_config)); + rpc.send_transaction(&tx).await }) }); @@ -217,7 +242,7 @@ impl> TransactionBuilder for EpochManagerTransac recent_blockhash: &Hash, work_items: &[WorkItem], config: BuildTransactionBatchConfig, - ) -> Vec { + ) -> Result> { let mut transactions = vec![]; let (_, all_instructions) = fetch_proofs_and_create_instructions( payer.pubkey(), @@ -226,8 +251,7 @@ impl> TransactionBuilder for EpochManagerTransac self.epoch, work_items, ) - .await - .unwrap(); + .await?; for instruction in all_instructions { let transaction = build_signed_transaction( payer, @@ -239,7 +263,7 @@ impl> TransactionBuilder for EpochManagerTransac .await; transactions.push(transaction); } - transactions + Ok(transactions) } }