Skip to content

Commit

Permalink
Refactor epoch manager timeout calculations (#1195)
Browse files Browse the repository at this point in the history
Improve timeout handling in epoch manager to prevent overflow errors. Introduce a checked multiplication for slot duration and update retry timeout accordingly. Additionally, modify transaction handling to break early based on calculated slot latency.
  • Loading branch information
sergeytimoshin authored Sep 10, 2024
1 parent 7ece15d commit 0911227
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 12 deletions.
13 changes: 12 additions & 1 deletion forester/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,17 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
)
.await?;

let light_slot_timeout = {
let slot_length_u32 = u32::try_from(epoch_pda.protocol_config.slot_length)
.map_err(|_| ForesterError::Custom("Slot length overflow".into()))?;

slot_duration()
.checked_mul(slot_length_u32)
.ok_or_else(|| {
ForesterError::Custom("Timeout calculation overflow".into())
})?
};

// TODO: measure accuracy
// Optional replace with shutdown signal for all child processes
let config = SendBatchedTransactionsConfig {
Expand All @@ -706,7 +717,7 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
compute_unit_limit: Some(1_000_000),
},
retry_config: RetryConfig {
timeout: slot_duration() * epoch_pda.protocol_config.slot_length as u32,
timeout: light_slot_timeout,
..self.config.retry_config
},
light_slot_length: epoch_pda.protocol_config.slot_length,
Expand Down
46 changes: 35 additions & 11 deletions forester/src/send_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use solana_sdk::{
signature::{Keypair, Signer},
};
use std::sync::Arc;
use std::{time::Duration, vec};
use std::time::Duration;
use std::vec;
use tokio::join;
use tokio::sync::Mutex;
use tokio::time::{sleep, Instant};
Expand All @@ -39,9 +40,16 @@ pub trait TransactionBuilder {
recent_blockhash: &Hash,
work_items: &[WorkItem],
config: BuildTransactionBatchConfig,
) -> Vec<Transaction>;
) -> Result<Vec<Transaction>>;
}

// We're assuming that:
// 1. Helius slot latency is ~ 3 slots.
// See also: https://p.us5.datadoghq.com/sb/339e0590-c5d4-11ed-9c7b-da7ad0900005-231a672007c47d70f38e8fa321bc8407?fromUser=false&refresh_mode=sliding&tpl_var_leader_name%5B0%5D=%2A&from_ts=1725348612900&to_ts=1725953412900&live=true
// 2. Latency between forester server and helius is ~ 1 slot.
// 3. Slot duration is 500ms.
const LATENCY: Duration = Duration::from_millis(4 * 500);

/// Setting:
/// 1. We have 1 light slot 15 seconds and a lot of elements in the queue
/// 2. we want to send as many elements from the queue as possible
Expand Down Expand Up @@ -112,7 +120,19 @@ pub async fn send_batched_transactions<T: TransactionBuilder, R: RpcConnection>(
work_items.chunks(config.build_transaction_batch_config.batch_size as usize)
{
// 6. Check if we reached the end of the light slot.
if (config.retry_config.timeout - start_time.elapsed()) < Duration::from_millis(20) {
let remaining_time = match config
.retry_config
.timeout
.checked_sub(start_time.elapsed())
{
Some(time) => time,
None => {
debug!("Reached end of light slot");
break;
}
};

if remaining_time < LATENCY {
debug!("Reached end of light slot");
break;
}
Expand All @@ -127,7 +147,7 @@ pub async fn send_batched_transactions<T: TransactionBuilder, R: RpcConnection>(
work_items,
config.build_transaction_batch_config,
)
.await;
.await?;
debug!(
"build transaction time {:?}",
transaction_build_time_start.elapsed()
Expand All @@ -139,6 +159,12 @@ pub async fn send_batched_transactions<T: TransactionBuilder, R: RpcConnection>(
.retry_config
.timeout
.saturating_sub(start_time.elapsed());

if remaining_time < LATENCY {
debug!("Reached end of light slot");
break;
}

// Asynchronously send all transactions in the batch
let send_futures = transactions.into_iter().map(move |tx| {
let url = url.clone();
Expand All @@ -147,9 +173,8 @@ pub async fn send_batched_transactions<T: TransactionBuilder, R: RpcConnection>(
..config.retry_config
};
tokio::spawn(async move {
let mut rpc =
SolanaRpcConnection::new_with_retry(url, None, Some(retry_config));
rpc.process_transaction(tx).await
let rpc = SolanaRpcConnection::new_with_retry(url, None, Some(retry_config));
rpc.send_transaction(&tx).await
})
});

Expand Down Expand Up @@ -217,7 +242,7 @@ impl<R: RpcConnection, I: Indexer<R>> TransactionBuilder for EpochManagerTransac
recent_blockhash: &Hash,
work_items: &[WorkItem],
config: BuildTransactionBatchConfig,
) -> Vec<Transaction> {
) -> Result<Vec<Transaction>> {
let mut transactions = vec![];
let (_, all_instructions) = fetch_proofs_and_create_instructions(
payer.pubkey(),
Expand All @@ -226,8 +251,7 @@ impl<R: RpcConnection, I: Indexer<R>> TransactionBuilder for EpochManagerTransac
self.epoch,
work_items,
)
.await
.unwrap();
.await?;
for instruction in all_instructions {
let transaction = build_signed_transaction(
payer,
Expand All @@ -239,7 +263,7 @@ impl<R: RpcConnection, I: Indexer<R>> TransactionBuilder for EpochManagerTransac
.await;
transactions.push(transaction);
}
transactions
Ok(transactions)
}
}

Expand Down

0 comments on commit 0911227

Please sign in to comment.