From f12720b2443d506190ae09bfd4ac071c986f3248 Mon Sep 17 00:00:00 2001
From: Sergey Timoshin <sergeytimoshin@proton.me>
Date: Sun, 8 Sep 2024 14:11:06 +0700
Subject: [PATCH 1/3] Add async support and retry logic to RPC methods

Added the 'async-trait' crate to provide async trait methods in RPC interfaces.
Implemented async versions of existing synchronous methods and added retry logic for better error handling in 'SolanaRpcConnection'.
---
 Cargo.lock                                    |   2 +
 .../programs/token-escrow/tests/test.rs       |   2 +-
 .../token-escrow/tests/test_compressed_pda.rs |   2 +-
 forester-utils/Cargo.toml                     |   1 +
 forester-utils/src/rpc/rpc_connection.rs      | 137 +++-----
 forester-utils/src/rpc/solana_rpc.rs          | 319 +++++++++++-------
 forester/src/rpc_pool.rs                      |   2 +-
 forester/src/tree_data_sync.rs                |   1 +
 test-programs/registry-test/tests/tests.rs    |   3 +
 test-utils/Cargo.toml                         |   1 +
 test-utils/src/e2e_test_env.rs                |   2 +-
 test-utils/src/rpc/test_rpc.rs                |  53 ++-
 test-utils/src/test_env.rs                    |   1 +
 13 files changed, 283 insertions(+), 243 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 8022373cb..4f4afe368 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2357,6 +2357,7 @@ dependencies = [
  "account-compression",
  "anchor-lang",
  "anchor-spl",
+ "async-trait",
  "light-compressed-token",
  "light-concurrent-merkle-tree",
  "light-hash-set",
@@ -3745,6 +3746,7 @@ dependencies = [
  "anchor-spl",
  "anyhow",
  "ark-ff",
+ "async-trait",
  "forester-utils",
  "light-compressed-token",
  "light-concurrent-merkle-tree",
diff --git a/examples/token-escrow/programs/token-escrow/tests/test.rs b/examples/token-escrow/programs/token-escrow/tests/test.rs
index 19692ecea..150e984fd 100644
--- a/examples/token-escrow/programs/token-escrow/tests/test.rs
+++ b/examples/token-escrow/programs/token-escrow/tests/test.rs
@@ -162,7 +162,7 @@ async fn test_escrow_pda() {
 
     assert_rpc_error(result, 0, EscrowError::EscrowLocked.into()).unwrap();
 
-    rpc.warp_to_slot(1000).unwrap();
+    rpc.warp_to_slot(1000).await.unwrap();
     // try withdrawal with invalid signer
     let result = perform_withdrawal_failing(
         &mut rpc,
diff --git a/examples/token-escrow/programs/token-escrow/tests/test_compressed_pda.rs b/examples/token-escrow/programs/token-escrow/tests/test_compressed_pda.rs
index 82c42e485..146578f17 100644
--- a/examples/token-escrow/programs/token-escrow/tests/test_compressed_pda.rs
+++ b/examples/token-escrow/programs/token-escrow/tests/test_compressed_pda.rs
@@ -106,7 +106,7 @@ async fn test_escrow_with_compressed_pda() {
     let rpc_error = RpcError::TransactionError(transaction_error);
     assert!(matches!(result, Err(error) if error.to_string() == rpc_error.to_string()));
 
-    rpc.warp_to_slot(lockup_end + 1).unwrap();
+    rpc.warp_to_slot(lockup_end + 1).await.unwrap();
     perform_withdrawal_with_event(
         &mut rpc,
         &mut test_indexer,
diff --git a/forester-utils/Cargo.toml b/forester-utils/Cargo.toml
index 885055947..71c030bdc 100644
--- a/forester-utils/Cargo.toml
+++ b/forester-utils/Cargo.toml
@@ -34,3 +34,4 @@ log = "0.4"
 num-bigint = "0.4.6"
 num-traits = "0.2.19"
 reqwest = "0.11.26"
+async-trait = "0.1.82"
\ No newline at end of file
diff --git a/forester-utils/src/rpc/rpc_connection.rs b/forester-utils/src/rpc/rpc_connection.rs
index 7472590a4..1a944f1f4 100644
--- a/forester-utils/src/rpc/rpc_connection.rs
+++ b/forester-utils/src/rpc/rpc_connection.rs
@@ -3,6 +3,7 @@ use crate::transaction_params::TransactionParams;
 use anchor_lang::solana_program::clock::Slot;
 use anchor_lang::solana_program::instruction::Instruction;
 use anchor_lang::AnchorDeserialize;
+use async_trait::async_trait;
 use solana_sdk::account::{Account, AccountSharedData};
 use solana_sdk::commitment_config::CommitmentConfig;
 use solana_sdk::epoch_info::EpochInfo;
@@ -12,126 +13,80 @@ use solana_sdk::signature::{Keypair, Signature};
 use solana_sdk::transaction::Transaction;
 use std::fmt::Debug;
 
+#[async_trait]
 pub trait RpcConnection: Send + Sync + Debug + 'static {
-    fn new<U: ToString>(_url: U, _commitment_config: Option<CommitmentConfig>) -> Self
+    fn new<U: ToString>(url: U, commitment_config: Option<CommitmentConfig>) -> Self
     where
-        Self: Sized,
-    {
-        unimplemented!()
-    }
-
-    fn health(&self) -> Result<(), RpcError> {
-        unimplemented!()
-    }
+        Self: Sized;
 
-    fn get_block_time(&self, _slot: u64) -> Result<i64, RpcError> {
-        unimplemented!()
-    }
+    fn get_payer(&self) -> &Keypair;
+    fn get_url(&self) -> String;
 
-    fn get_program_accounts(&self, program_id: &Pubkey)
-        -> Result<Vec<(Pubkey, Account)>, RpcError>;
+    async fn health(&self) -> Result<(), RpcError>;
+    async fn get_block_time(&self, slot: u64) -> Result<i64, RpcError>;
+    async fn get_epoch_info(&self) -> Result<EpochInfo, RpcError>;
 
-    fn process_transaction(
+    async fn get_program_accounts(
+        &self,
+        program_id: &Pubkey,
+    ) -> Result<Vec<(Pubkey, Account)>, RpcError>;
+    async fn process_transaction(
         &mut self,
         transaction: Transaction,
-    ) -> impl std::future::Future<Output = Result<Signature, RpcError>> + Send;
-
-    fn process_transaction_with_context(
+    ) -> Result<Signature, RpcError>;
+    async fn process_transaction_with_context(
         &mut self,
         transaction: Transaction,
-    ) -> impl std::future::Future<Output = Result<(Signature, Slot), RpcError>> + Send;
+    ) -> Result<(Signature, Slot), RpcError>;
 
-    fn create_and_send_transaction_with_event<T>(
+    async fn create_and_send_transaction_with_event<T>(
         &mut self,
-        instruction: &[Instruction],
+        instructions: &[Instruction],
         authority: &Pubkey,
         signers: &[&Keypair],
         transaction_params: Option<TransactionParams>,
-    ) -> impl std::future::Future<Output = Result<Option<(T, Signature, Slot)>, RpcError>> + Send
+    ) -> Result<Option<(T, Signature, Slot)>, RpcError>
     where
         T: AnchorDeserialize + Send + Debug;
 
-    fn create_and_send_transaction<'a>(
+    async fn create_and_send_transaction<'a>(
         &'a mut self,
-        instruction: &'a [Instruction],
+        instructions: &'a [Instruction],
         payer: &'a Pubkey,
         signers: &'a [&'a Keypair],
-    ) -> impl std::future::Future<Output = Result<Signature, RpcError>> + Send + 'a {
-        async move {
-            let blockhash = self.get_latest_blockhash().await?;
-            let transaction = Transaction::new_signed_with_payer(
-                instruction,
-                Some(payer),
-                &signers.to_vec(),
-                blockhash,
-            );
-            let signature = transaction.signatures[0];
-            self.process_transaction(transaction).await?;
-            Ok(signature)
-        }
+    ) -> Result<Signature, RpcError> {
+        let blockhash = self.get_latest_blockhash().await?;
+        let transaction =
+            Transaction::new_signed_with_payer(instructions, Some(payer), signers, blockhash);
+        self.process_transaction(transaction).await
     }
 
-    fn confirm_transaction(
-        &self,
-        transaction: Signature,
-    ) -> impl std::future::Future<Output = Result<bool, RpcError>> + Send;
-
-    fn get_payer(&self) -> &Keypair;
-    fn get_account(
-        &mut self,
-        address: Pubkey,
-    ) -> impl std::future::Future<Output = Result<Option<Account>, RpcError>> + Send;
+    async fn confirm_transaction(&self, signature: Signature) -> Result<bool, RpcError>;
+    async fn get_account(&mut self, address: Pubkey) -> Result<Option<Account>, RpcError>;
     fn set_account(&mut self, address: &Pubkey, account: &AccountSharedData);
-
-    fn get_minimum_balance_for_rent_exemption(
+    async fn get_minimum_balance_for_rent_exemption(
         &mut self,
         data_len: usize,
-    ) -> impl std::future::Future<Output = Result<u64, RpcError>> + Send;
+    ) -> Result<u64, RpcError>;
+    async fn airdrop_lamports(&mut self, to: &Pubkey, lamports: u64)
+        -> Result<Signature, RpcError>;
 
-    fn airdrop_lamports(
+    async fn get_anchor_account<T: AnchorDeserialize>(
         &mut self,
-        to: &Pubkey,
-        lamports: u64,
-    ) -> impl std::future::Future<Output = Result<Signature, RpcError>> + Send;
-
-    fn get_anchor_account<'a, T: AnchorDeserialize + 'static>(
-        &'a mut self,
-        pubkey: &'a Pubkey,
-    ) -> impl std::future::Future<Output = Result<Option<T>, RpcError>> + Send + 'a {
-        async move {
-            match self.get_account(*pubkey).await? {
-                Some(account) => {
-                    let data = T::deserialize(&mut &account.data[8..]).map_err(RpcError::from)?;
-                    Ok(Some(data))
-                }
-                None => Ok(None),
+        pubkey: &Pubkey,
+    ) -> Result<Option<T>, RpcError> {
+        match self.get_account(*pubkey).await? {
+            Some(account) => {
+                let data = T::deserialize(&mut &account.data[8..]).map_err(RpcError::from)?;
+                Ok(Some(data))
             }
+            None => Ok(None),
         }
     }
 
-    fn get_balance(
-        &mut self,
-        pubkey: &Pubkey,
-    ) -> impl std::future::Future<Output = Result<u64, RpcError>> + Send;
-
-    fn get_latest_blockhash(
-        &mut self,
-    ) -> impl std::future::Future<Output = Result<Hash, RpcError>> + Send;
-
-    fn get_slot(&mut self) -> impl std::future::Future<Output = Result<u64, RpcError>> + Send;
-
-    fn warp_to_slot(&mut self, _slot: Slot) -> Result<(), RpcError> {
-        unimplemented!()
-    }
-
-    fn get_epoch_info(&self) -> Result<EpochInfo, RpcError> {
-        unimplemented!()
-    }
-
-    fn send_transaction(
-        &self,
-        transaction: &Transaction,
-    ) -> impl std::future::Future<Output = Result<Signature, RpcError>> + Send;
-
-    fn get_url(&self) -> String;
+    async fn get_balance(&mut self, pubkey: &Pubkey) -> Result<u64, RpcError>;
+    async fn get_latest_blockhash(&mut self) -> Result<Hash, RpcError>;
+    async fn get_slot(&mut self) -> Result<u64, RpcError>;
+    async fn warp_to_slot(&mut self, slot: Slot) -> Result<(), RpcError>;
+    async fn send_transaction(&self, transaction: &Transaction) -> Result<Signature, RpcError>;
 }
diff --git a/forester-utils/src/rpc/solana_rpc.rs b/forester-utils/src/rpc/solana_rpc.rs
index 8f17577c8..551a3649b 100644
--- a/forester-utils/src/rpc/solana_rpc.rs
+++ b/forester-utils/src/rpc/solana_rpc.rs
@@ -5,7 +5,8 @@ use anchor_lang::prelude::Pubkey;
 use anchor_lang::solana_program::clock::Slot;
 use anchor_lang::solana_program::hash::Hash;
 use anchor_lang::AnchorDeserialize;
-use log::{debug, warn};
+use async_trait::async_trait;
+use log::warn;
 use solana_client::rpc_client::RpcClient;
 use solana_client::rpc_config::{RpcSendTransactionConfig, RpcTransactionConfig};
 use solana_program_test::BanksClientError;
@@ -21,6 +22,7 @@ use solana_transaction_status::option_serializer::OptionSerializer;
 use solana_transaction_status::{UiInstruction, UiTransactionEncoding};
 use std::fmt::{Debug, Display, Formatter};
 use std::time::Duration;
+use tokio::time::sleep;
 
 pub enum SolanaRpcUrl {
     Testnet,
@@ -43,10 +45,26 @@ impl Display for SolanaRpcUrl {
     }
 }
 
+#[derive(Clone, Debug)]
+pub struct RetryConfig {
+    max_retries: u32,
+    retry_delay: Duration,
+}
+
+impl Default for RetryConfig {
+    fn default() -> Self {
+        RetryConfig {
+            max_retries: 10,
+            retry_delay: Duration::from_millis(100),
+        }
+    }
+}
+
 #[allow(dead_code)]
 pub struct SolanaRpcConnection {
     pub client: RpcClient,
     pub payer: Keypair,
+    retry_config: RetryConfig,
 }
 
 impl Debug for SolanaRpcConnection {
@@ -59,6 +77,48 @@ impl Debug for SolanaRpcConnection {
     }
 }
 
+impl SolanaRpcConnection {
+    pub fn new_with_retry<U: ToString>(
+        url: U,
+        commitment_config: Option<CommitmentConfig>,
+        retry_config: Option<RetryConfig>,
+    ) -> Self {
+        let payer = Keypair::new();
+        let commitment_config = commitment_config.unwrap_or(CommitmentConfig::confirmed());
+        let client = RpcClient::new_with_commitment(url.to_string(), commitment_config);
+        let retry_config = retry_config.unwrap_or_default();
+        Self {
+            client,
+            payer,
+            retry_config,
+        }
+    }
+
+    async fn retry<F, Fut, T>(&self, operation: F) -> Result<T, RpcError>
+    where
+        F: Fn() -> Fut,
+        Fut: std::future::Future<Output = Result<T, RpcError>>,
+    {
+        let mut attempts = 0;
+        loop {
+            match operation().await {
+                Ok(result) => return Ok(result),
+                Err(e) => {
+                    attempts += 1;
+                    if attempts >= self.retry_config.max_retries {
+                        return Err(e);
+                    }
+                    warn!(
+                        "Operation failed, retrying in {:?}: {:?}",
+                        self.retry_config.retry_delay, e
+                    );
+                    sleep(self.retry_config.retry_delay).await;
+                }
+            }
+        }
+    }
+}
+
 impl SolanaRpcConnection {
     fn parse_inner_instructions<T: AnchorDeserialize>(
         &self,
@@ -124,68 +184,78 @@ impl SolanaRpcConnection {
     }
 }
 
+#[async_trait]
 impl RpcConnection for SolanaRpcConnection {
     fn new<U: ToString>(url: U, commitment_config: Option<CommitmentConfig>) -> Self
     where
         Self: Sized,
     {
-        let payer = Keypair::new();
-        let commitment_config = commitment_config.unwrap_or(CommitmentConfig::confirmed());
-        let client = RpcClient::new_with_commitment(url, commitment_config);
-        Self { client, payer }
+        Self::new_with_retry(url, commitment_config, None)
     }
 
-    fn health(&self) -> Result<(), RpcError> {
-        let result = self.client.get_health();
-        match result {
-            Ok(_) => Ok(()),
-            Err(e) => Err(RpcError::ClientError(e)),
-        }
+    fn get_payer(&self) -> &Keypair {
+        &self.payer
+    }
+
+    fn get_url(&self) -> String {
+        self.client.url()
+    }
+
+    async fn health(&self) -> Result<(), RpcError> {
+        self.retry(|| async { self.client.get_health().map_err(RpcError::from) })
+            .await
     }
 
-    fn get_block_time(&self, slot: u64) -> Result<UnixTimestamp, RpcError> {
-        self.client.get_block_time(slot).map_err(RpcError::from)
+    async fn get_block_time(&self, slot: u64) -> Result<UnixTimestamp, RpcError> {
+        self.retry(|| async { self.client.get_block_time(slot).map_err(RpcError::from) })
+            .await
     }
 
-    fn get_program_accounts(
+    async fn get_epoch_info(&self) -> Result<EpochInfo, RpcError> {
+        self.retry(|| async { self.client.get_epoch_info().map_err(RpcError::from) })
+            .await
+    }
+
+    async fn get_program_accounts(
         &self,
         program_id: &Pubkey,
     ) -> Result<Vec<(Pubkey, Account)>, RpcError> {
-        debug!(
-            "Fetching accounts for program: {}, client url: {}",
-            program_id,
-            self.client.url()
-        );
-        self.client
-            .get_program_accounts(program_id)
-            .map_err(RpcError::from)
+        self.retry(|| async {
+            self.client
+                .get_program_accounts(program_id)
+                .map_err(RpcError::from)
+        })
+        .await
     }
 
     async fn process_transaction(
         &mut self,
         transaction: Transaction,
     ) -> Result<Signature, RpcError> {
-        debug!("CommitmentConfig: {:?}", self.client.commitment());
-        match self.client.send_and_confirm_transaction(&transaction) {
-            Ok(signature) => Ok(signature),
-            Err(e) => Err(RpcError::ClientError(e)),
-        }
+        self.retry(|| async {
+            self.client
+                .send_and_confirm_transaction(&transaction)
+                .map_err(RpcError::from)
+        })
+        .await
     }
 
     async fn process_transaction_with_context(
         &mut self,
         transaction: Transaction,
     ) -> Result<(Signature, Slot), RpcError> {
-        debug!("CommitmentConfig: {:?}", self.client.commitment());
-        match self.client.send_and_confirm_transaction(&transaction) {
-            Ok(signature) => {
-                let sig_info = self.client.get_signature_statuses(&[signature]);
-                let sig_info = sig_info.unwrap().value.first().unwrap().clone();
-                let slot = sig_info.unwrap().slot;
-                Ok((signature, slot))
-            }
-            Err(e) => Err(RpcError::ClientError(e)),
-        }
+        self.retry(|| async {
+            let signature = self.client.send_and_confirm_transaction(&transaction)?;
+            let sig_info = self.client.get_signature_statuses(&[signature])?;
+            let slot = sig_info
+                .value
+                .first()
+                .and_then(|s| s.as_ref())
+                .map(|s| s.slot)
+                .ok_or_else(|| RpcError::CustomError("Failed to get slot".into()))?;
+            Ok((signature, slot))
+        })
+        .await
     }
 
     async fn create_and_send_transaction_with_event<T>(
@@ -196,7 +266,7 @@ impl RpcConnection for SolanaRpcConnection {
         transaction_params: Option<TransactionParams>,
     ) -> Result<Option<(T, Signature, u64)>, RpcError>
     where
-        T: AnchorDeserialize + Debug,
+        T: AnchorDeserialize + Send + Debug,
     {
         let pre_balance = self.client.get_balance(payer)?;
         let latest_blockhash = self.client.get_latest_blockhash()?;
@@ -212,28 +282,23 @@ impl RpcConnection for SolanaRpcConnection {
             signers,
             latest_blockhash,
         );
-        let signature = self.client.send_and_confirm_transaction(&transaction)?;
-        let sig_info = self.client.get_signature_statuses(&[signature]);
-        let sig_info = sig_info.unwrap().value.first().unwrap().clone();
-        let slot = sig_info.unwrap().slot;
-
-        let mut event = transaction
-            .message
-            .instructions
-            .iter()
-            .find_map(|instruction| T::try_from_slice(instruction.data.as_slice()).ok());
-
-        if event.is_none() {
-            let parsed_event: Result<T, RpcError> = self.parse_inner_instructions::<T>(signature);
-            event = match parsed_event {
-                Ok(e) => Some(e),
-                Err(e) => {
-                    println!("solana_rpc: error parsing inner instructions: {:?}", e);
-                    None
-                }
+
+        let (signature, slot) = self
+            .process_transaction_with_context(transaction.clone())
+            .await?;
+
+        let mut parsed_event = None;
+        for instruction in &transaction.message.instructions {
+            if let Ok(e) = T::deserialize(&mut &instruction.data[..]) {
+                parsed_event = Some(e);
+                break;
             }
         }
 
+        if parsed_event.is_none() {
+            parsed_event = self.parse_inner_instructions::<T>(signature).ok();
+        }
+
         if let Some(transaction_params) = transaction_params {
             let mut deduped_signers = signers.to_vec();
             deduped_signers.dedup();
@@ -272,40 +337,43 @@ impl RpcConnection for SolanaRpcConnection {
             }
         }
 
-        let result = event.map(|event| (event, signature, slot));
+        let result = parsed_event.map(|e| (e, signature, slot));
         Ok(result)
     }
 
-    async fn confirm_transaction(&self, transaction: Signature) -> Result<bool, RpcError> {
-        self.client
-            .confirm_transaction(&transaction)
-            .map_err(RpcError::from)
-    }
-
-    fn get_payer(&self) -> &Keypair {
-        &self.payer
+    async fn confirm_transaction(&self, signature: Signature) -> Result<bool, RpcError> {
+        self.retry(|| async {
+            self.client
+                .confirm_transaction(&signature)
+                .map_err(RpcError::from)
+        })
+        .await
     }
 
     async fn get_account(&mut self, address: Pubkey) -> Result<Option<Account>, RpcError> {
-        debug!("CommitmentConfig: {:?}", self.client.commitment());
-        let result = self
-            .client
-            .get_account_with_commitment(&address, self.client.commitment());
-        result.map(|account| account.value).map_err(RpcError::from)
+        self.retry(|| async {
+            self.client
+                .get_account_with_commitment(&address, self.client.commitment())
+                .map(|response| response.value)
+                .map_err(RpcError::from)
+        })
+        .await
     }
 
     fn set_account(&mut self, _address: &Pubkey, _account: &AccountSharedData) {
-        todo!()
+        unimplemented!()
     }
 
     async fn get_minimum_balance_for_rent_exemption(
         &mut self,
         data_len: usize,
     ) -> Result<u64, RpcError> {
-        match self.client.get_minimum_balance_for_rent_exemption(data_len) {
-            Ok(result) => Ok(result),
-            Err(e) => Err(RpcError::ClientError(e)),
-        }
+        self.retry(|| async {
+            self.client
+                .get_minimum_balance_for_rent_exemption(data_len)
+                .map_err(RpcError::from)
+        })
+        .await
     }
 
     async fn airdrop_lamports(
@@ -313,75 +381,64 @@ impl RpcConnection for SolanaRpcConnection {
         to: &Pubkey,
         lamports: u64,
     ) -> Result<Signature, RpcError> {
-        const MAX_RETRIES: u32 = 10;
-
-        for attempt in 0..MAX_RETRIES {
-            match self.client.request_airdrop(to, lamports) {
-                Ok(signature) => {
-                    println!("Airdrop signature: {:?}", signature);
-
-                    // Try to confirm the transaction
-                    for confirm_attempt in 0..MAX_RETRIES {
-                        if self
-                            .client
-                            .confirm_transaction_with_commitment(
-                                &signature,
-                                self.client.commitment(),
-                            )?
-                            .value
-                        {
-                            return Ok(signature);
-                        } else {
-                            warn!("Airdrop not confirmed, retrying confirmation...");
-                            tokio::time::sleep(Duration::from_secs(confirm_attempt as u64)).await;
-                        }
-                    }
-
-                    return Err(RpcError::CustomError(
-                        "Max retries reached for airdrop confirmation".into(),
-                    ));
+        self.retry(|| async {
+            let signature = self
+                .client
+                .request_airdrop(to, lamports)
+                .map_err(RpcError::ClientError)?;
+            println!("Airdrop signature: {:?}", signature);
+            self.retry(|| async {
+                if self
+                    .client
+                    .confirm_transaction_with_commitment(&signature, self.client.commitment())?
+                    .value
+                {
+                    Ok(())
+                } else {
+                    Err(RpcError::CustomError("Airdrop not confirmed".into()))
                 }
-                Err(err) => {
-                    warn!("Airdrop request failed with error: {:?}", err);
-                }
-            }
-            tokio::time::sleep(Duration::from_secs(attempt as u64)).await;
-        }
-        Err(RpcError::CustomError(
-            "Max retries reached for airdrop request".into(),
-        ))
+            })
+            .await?;
+
+            Ok(signature)
+        })
+        .await
     }
 
     async fn get_balance(&mut self, pubkey: &Pubkey) -> Result<u64, RpcError> {
-        self.client.get_balance(pubkey).map_err(RpcError::from)
+        self.retry(|| async { self.client.get_balance(pubkey).map_err(RpcError::from) })
+            .await
     }
 
     async fn get_latest_blockhash(&mut self) -> Result<Hash, RpcError> {
-        self.client.get_latest_blockhash().map_err(RpcError::from)
+        self.retry(|| async { self.client.get_latest_blockhash().map_err(RpcError::from) })
+            .await
     }
 
     async fn get_slot(&mut self) -> Result<u64, RpcError> {
-        self.client.get_slot().map_err(RpcError::from)
+        self.retry(|| async { self.client.get_slot().map_err(RpcError::from) })
+            .await
     }
 
-    fn get_epoch_info(&self) -> Result<EpochInfo, RpcError> {
-        self.client.get_epoch_info().map_err(RpcError::from)
+    async fn warp_to_slot(&mut self, _slot: Slot) -> Result<(), RpcError> {
+        Err(RpcError::CustomError(
+            "Warp to slot is not supported in SolanaRpcConnection".to_string(),
+        ))
     }
 
     async fn send_transaction(&self, transaction: &Transaction) -> Result<Signature, RpcError> {
-        self.client
-            .send_transaction_with_config(
-                transaction,
-                RpcSendTransactionConfig {
-                    skip_preflight: true,
-                    max_retries: Some(3),
-                    ..Default::default()
-                },
-            )
-            .map_err(RpcError::from)
-    }
-
-    fn get_url(&self) -> String {
-        self.client.url()
+        self.retry(|| async {
+            self.client
+                .send_transaction_with_config(
+                    transaction,
+                    RpcSendTransactionConfig {
+                        skip_preflight: true,
+                        max_retries: Some(self.retry_config.max_retries as usize),
+                        ..Default::default()
+                    },
+                )
+                .map_err(RpcError::from)
+        })
+        .await
     }
 }
diff --git a/forester/src/rpc_pool.rs b/forester/src/rpc_pool.rs
index be41631cb..4aedc55d8 100644
--- a/forester/src/rpc_pool.rs
+++ b/forester/src/rpc_pool.rs
@@ -42,7 +42,7 @@ impl<R: RpcConnection> bb8::ManageConnection for SolanaConnectionManager<R> {
     }
 
     async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
-        conn.health().map_err(PoolError::RpcRequest)
+        conn.health().await.map_err(PoolError::RpcRequest)
     }
 
     fn has_broken(&self, _conn: &mut Self::Connection) -> bool {
diff --git a/forester/src/tree_data_sync.rs b/forester/src/tree_data_sync.rs
index 77d495f01..101e165e3 100644
--- a/forester/src/tree_data_sync.rs
+++ b/forester/src/tree_data_sync.rs
@@ -12,6 +12,7 @@ pub async fn fetch_trees<R: RpcConnection>(rpc: &R) -> Vec<TreeAccounts> {
     let program_id = account_compression::id();
     debug!("Fetching accounts for program: {}", program_id);
     rpc.get_program_accounts(&program_id)
+        .await
         .unwrap()
         .into_iter()
         .filter_map(|(pubkey, account)| process_account(pubkey, account))
diff --git a/test-programs/registry-test/tests/tests.rs b/test-programs/registry-test/tests/tests.rs
index 5d3b105be..327271c11 100644
--- a/test-programs/registry-test/tests/tests.rs
+++ b/test-programs/registry-test/tests/tests.rs
@@ -627,6 +627,7 @@ async fn test_register_and_update_forester_pda() {
 
     // advance epoch to active phase
     rpc.warp_to_slot(registered_epoch.phases.active.start)
+        .await
         .unwrap();
     // finalize registration
     {
@@ -692,6 +693,7 @@ async fn test_register_and_update_forester_pda() {
     rpc.warp_to_slot(
         registered_epoch.phases.report_work.start - protocol_config.registration_phase_length,
     )
+    .await
     .unwrap();
     // register for next epoch
     let next_registered_epoch = Epoch::register(&mut rpc, &protocol_config, &forester_keypair)
@@ -709,6 +711,7 @@ async fn test_register_and_update_forester_pda() {
     )
     .await;
     rpc.warp_to_slot(registered_epoch.phases.report_work.start)
+        .await
         .unwrap();
     // report work
     {
diff --git a/test-utils/Cargo.toml b/test-utils/Cargo.toml
index 132cee8b1..cf8bb3f4e 100644
--- a/test-utils/Cargo.toml
+++ b/test-utils/Cargo.toml
@@ -43,6 +43,7 @@ rand = "0.8"
 photon-api = { path = "../photon-api", version = "0.31.0" }
 log = "0.4"
 serde = { version = "1.0.197", features = ["derive"] }
+async-trait = "0.1.82"
 
 [dev-dependencies]
 rand = "0.8"
diff --git a/test-utils/src/e2e_test_env.rs b/test-utils/src/e2e_test_env.rs
index a478de6c6..c2266f36e 100644
--- a/test-utils/src/e2e_test_env.rs
+++ b/test-utils/src/e2e_test_env.rs
@@ -578,7 +578,7 @@ where
             let new_slot = current_solana_slot + self.protocol_config.slot_length;
             println!("advanced slot from {} to {}", self.slot, current_light_slot);
             println!("solana slot from {} to {}", current_solana_slot, new_slot);
-            self.rpc.warp_to_slot(new_slot).unwrap();
+            self.rpc.warp_to_slot(new_slot).await.unwrap();
 
             self.slot = current_light_slot + 1;
 
diff --git a/test-utils/src/rpc/test_rpc.rs b/test-utils/src/rpc/test_rpc.rs
index c533a482b..7e8e2b8bc 100644
--- a/test-utils/src/rpc/test_rpc.rs
+++ b/test-utils/src/rpc/test_rpc.rs
@@ -5,8 +5,11 @@ use anchor_lang::solana_program::clock::Slot;
 use anchor_lang::solana_program::hash::Hash;
 use anchor_lang::solana_program::system_instruction;
 use anchor_lang::AnchorDeserialize;
+use async_trait::async_trait;
 use solana_program_test::{BanksClientError, ProgramTestContext};
 use solana_sdk::account::{Account, AccountSharedData};
+use solana_sdk::commitment_config::CommitmentConfig;
+use solana_sdk::epoch_info::EpochInfo;
 use solana_sdk::instruction::{Instruction, InstructionError};
 use solana_sdk::signature::{Keypair, Signature};
 use solana_sdk::signer::Signer;
@@ -26,8 +29,36 @@ impl Debug for ProgramTestRpcConnection {
     }
 }
 
+#[async_trait]
 impl RpcConnection for ProgramTestRpcConnection {
-    fn get_program_accounts(
+    fn new<U: ToString>(_url: U, _commitment_config: Option<CommitmentConfig>) -> Self
+    where
+        Self: Sized,
+    {
+        unimplemented!()
+    }
+
+    fn get_payer(&self) -> &Keypair {
+        &self.context.payer
+    }
+
+    fn get_url(&self) -> String {
+        unimplemented!("get_url doesn't make sense for ProgramTestRpcConnection")
+    }
+
+    async fn health(&self) -> Result<(), RpcError> {
+        unimplemented!()
+    }
+
+    async fn get_block_time(&self, _slot: u64) -> Result<i64, RpcError> {
+        unimplemented!()
+    }
+
+    async fn get_epoch_info(&self) -> Result<EpochInfo, RpcError> {
+        unimplemented!()
+    }
+
+    async fn get_program_accounts(
         &self,
         _program_id: &Pubkey,
     ) -> Result<Vec<(Pubkey, Account)>, RpcError> {
@@ -73,7 +104,7 @@ impl RpcConnection for ProgramTestRpcConnection {
         transaction_params: Option<TransactionParams>,
     ) -> Result<Option<(T, Signature, Slot)>, RpcError>
     where
-        T: AnchorDeserialize,
+        T: AnchorDeserialize + Send + Debug,
     {
         let pre_balance = self
             .context
@@ -187,10 +218,6 @@ impl RpcConnection for ProgramTestRpcConnection {
         Ok(true)
     }
 
-    fn get_payer(&self) -> &Keypair {
-        &self.context.payer
-    }
-
     async fn get_account(&mut self, address: Pubkey) -> Result<Option<Account>, RpcError> {
         self.context
             .banks_client
@@ -267,19 +294,11 @@ impl RpcConnection for ProgramTestRpcConnection {
             .map_err(RpcError::from)
     }
 
-    fn warp_to_slot(&mut self, slot: Slot) -> Result<(), RpcError> {
+    async fn warp_to_slot(&mut self, slot: Slot) -> Result<(), RpcError> {
         self.context.warp_to_slot(slot).map_err(RpcError::from)
     }
 
-    #[allow(clippy::manual_async_fn)]
-    fn send_transaction(
-        &self,
-        _transaction: &Transaction,
-    ) -> impl std::future::Future<Output = Result<Signature, RpcError>> + Send {
-        async { unimplemented!("send transaction is unimplemented for ProgramTestRpcConnection") }
-    }
-
-    fn get_url(&self) -> String {
-        unimplemented!("get_url doesn't make sense for ProgramTestRpcConnection")
+    async fn send_transaction(&self, _transaction: &Transaction) -> Result<Signature, RpcError> {
+        unimplemented!("send transaction is unimplemented for ProgramTestRpcConnection")
     }
 }
diff --git a/test-utils/src/test_env.rs b/test-utils/src/test_env.rs
index cfed50d21..ba4eba266 100644
--- a/test-utils/src/test_env.rs
+++ b/test-utils/src/test_env.rs
@@ -443,6 +443,7 @@ pub async fn initialize_accounts<R: RpcConnection>(
             .unwrap();
         context
             .warp_to_slot(registered_epoch.phases.active.start)
+            .await
             .unwrap();
         let tree_accounts = vec![
             TreeAccounts {

From bca013016c8bd86f1aea9afe5d89962aad1796ea Mon Sep 17 00:00:00 2001
From: Sergey Timoshin <sergeytimoshin@proton.me>
Date: Sun, 8 Sep 2024 17:07:36 +0700
Subject: [PATCH 2/3] Switch to async_trait for TransactionBuilder methods

The `build_signed_transaction_batch` method in `TransactionBuilder` now uses `async_trait` to support async/await syntax. This change ensures better concurrency handling and simplifies the implementation of asynchronous code within the trait. Additionally, added a TODO comment about optimizing retry logic for sending transactions.
---
 forester/src/send_transaction.rs | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/forester/src/send_transaction.rs b/forester/src/send_transaction.rs
index 45e4cbb4f..244f6a2d9 100644
--- a/forester/src/send_transaction.rs
+++ b/forester/src/send_transaction.rs
@@ -26,19 +26,21 @@ use solana_sdk::{
 };
 use std::sync::Arc;
 use std::{time::Duration, vec};
+use async_trait::async_trait;
 use tokio::join;
 use tokio::sync::Mutex;
 use tokio::time::sleep;
 use tracing::{debug, warn};
 
+#[async_trait]
 pub trait TransactionBuilder {
-    fn build_signed_transaction_batch(
+    async fn build_signed_transaction_batch(
         &self,
         payer: &Keypair,
         recent_blockhash: &Hash,
         work_items: &[WorkItem],
         config: BuildTransactionBatchConfig,
-    ) -> impl std::future::Future<Output = Vec<Transaction>> + Send;
+    ) -> Vec<Transaction>;
 }
 
 /// Setting:
@@ -186,6 +188,8 @@ pub struct RetryConfig {
     pub global_timeout: u128,
 }
 
+// TODO: We have retry logic for send_transaction in solana_rpc. We should use that,
+// otherwise we'd do unnecessary retries.
 /// Sends a transaction and retries if not confirmed after retry wait time.
 /// Stops retrying at the global timeout (end of light slot).
 pub async fn send_signed_transaction(
@@ -240,6 +244,7 @@ pub struct EpochManagerTransactions<R: RpcConnection, I: Indexer<R>> {
     pub phantom: std::marker::PhantomData<R>,
 }
 
+#[async_trait]
 impl<R: RpcConnection, I: Indexer<R>> TransactionBuilder for EpochManagerTransactions<R, I> {
     async fn build_signed_transaction_batch(
         &self,

From 90b581c510be61472d10a436db7476ed79e52a67 Mon Sep 17 00:00:00 2001
From: Sergey Timoshin <sergeytimoshin@proton.me>
Date: Sun, 8 Sep 2024 17:15:00 +0700
Subject: [PATCH 3/3] Reorganize imports in send_transaction.rs

Move `async_trait::async_trait` to the correct location. This change improves code readability by maintaining a consistent import ordering.
---
 forester/src/send_transaction.rs | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/forester/src/send_transaction.rs b/forester/src/send_transaction.rs
index 244f6a2d9..65753d22a 100644
--- a/forester/src/send_transaction.rs
+++ b/forester/src/send_transaction.rs
@@ -8,6 +8,7 @@ use account_compression::utils::constants::{
     ADDRESS_MERKLE_TREE_CHANGELOG, ADDRESS_MERKLE_TREE_INDEXED_CHANGELOG,
     STATE_MERKLE_TREE_CHANGELOG,
 };
+use async_trait::async_trait;
 use forester_utils::forester_epoch::{TreeAccounts, TreeType};
 use forester_utils::indexer::Indexer;
 use forester_utils::rpc::{RpcConnection, RpcError, SolanaRpcConnection};
@@ -26,7 +27,6 @@ use solana_sdk::{
 };
 use std::sync::Arc;
 use std::{time::Duration, vec};
-use async_trait::async_trait;
 use tokio::join;
 use tokio::sync::Mutex;
 use tokio::time::sleep;