diff --git a/forester/forester.toml b/forester/forester.toml index c42763eef5..2dde4004ad 100644 --- a/forester/forester.toml +++ b/forester/forester.toml @@ -7,4 +7,8 @@ INDEXER_BATCH_SIZE=50 INDEXER_MAX_CONCURRENT_BATCHES=50 TRANSACTION_BATCH_SIZE=1 TRANSACTION_MAX_CONCURRENT_BATCHES=50 -SLOT_UPDATE_INTERVAL_SECONDS=30 \ No newline at end of file +SLOT_UPDATE_INTERVAL_SECONDS=30 +STATE_QUEUE_START_INDEX=0 +STATE_QUEUE_LENGTH=28807 +ADDRESS_QUEUE_START_INDEX=0 +ADDRESS_QUEUE_LENGTH=28807 \ No newline at end of file diff --git a/forester/src/config.rs b/forester/src/config.rs index 882a72a96a..2153b3bca9 100644 --- a/forester/src/config.rs +++ b/forester/src/config.rs @@ -1,3 +1,4 @@ +use account_compression::utils::constants::{ADDRESS_QUEUE_VALUES, STATE_NULLIFIER_QUEUE_VALUES}; use forester_utils::forester_epoch::{Epoch, TreeAccounts, TreeForesterSchedule}; use forester_utils::rpc::RetryConfig; use light_registry::{EpochPda, ForesterEpochPda}; @@ -40,6 +41,7 @@ impl ForesterEpochInfo { pub struct ForesterConfig { pub external_services: ExternalServicesConfig, pub retry_config: RetryConfig, + pub queue_config: QueueConfig, pub registry_pubkey: Pubkey, pub payer_keypair: Keypair, pub cu_limit: u32, @@ -59,6 +61,7 @@ impl Clone for ForesterConfig { Self { external_services: self.external_services.clone(), retry_config: self.retry_config, + queue_config: self.queue_config, registry_pubkey: self.registry_pubkey, payer_keypair: Keypair::from_bytes(&self.payer_keypair.to_bytes()).unwrap(), cu_limit: self.cu_limit, @@ -85,3 +88,22 @@ pub struct ExternalServicesConfig { pub derivation: String, pub pushgateway_url: String, } + +#[derive(Debug, Clone, Copy)] +pub struct QueueConfig { + pub state_queue_start_index: u16, + pub state_queue_length: u16, + pub address_queue_start_index: u16, + pub address_queue_length: u16, +} + +impl Default for QueueConfig { + fn default() -> Self { + QueueConfig { + state_queue_start_index: 0, + state_queue_length: STATE_NULLIFIER_QUEUE_VALUES, + address_queue_start_index: 0, + address_queue_length: ADDRESS_QUEUE_VALUES, + } + } +} diff --git a/forester/src/epoch_manager.rs b/forester/src/epoch_manager.rs index fd565d6233..de162c6bd2 100644 --- a/forester/src/epoch_manager.rs +++ b/forester/src/epoch_manager.rs @@ -709,17 +709,19 @@ impl> EpochManager { // TODO: measure accuracy // Optional replace with shutdown signal for all child processes - let config = SendBatchedTransactionsConfig { + let batched_tx_config = SendBatchedTransactionsConfig { num_batches: 10, build_transaction_batch_config: BuildTransactionBatchConfig { batch_size: 50, // TODO: make batch size configurable and or dynamic based on queue usage compute_unit_price: None, // Make dynamic based on queue usage compute_unit_limit: Some(1_000_000), }, + queue_config: self.config.queue_config, retry_config: RetryConfig { timeout: light_slot_timeout, ..self.config.retry_config }, + light_slot_length: epoch_pda.protocol_config.slot_length, }; @@ -734,7 +736,7 @@ impl> EpochManager { let batch_tx_future = send_batched_transactions( &self.config.payer_keypair, self.rpc_pool.clone(), - &config, // TODO: define config in epoch manager + &batched_tx_config, // TODO: define config in epoch manager tree.tree_accounts, &transaction_builder, ); diff --git a/forester/src/lib.rs b/forester/src/lib.rs index ba7521ebd6..c76406aab6 100644 --- a/forester/src/lib.rs +++ b/forester/src/lib.rs @@ -24,6 +24,7 @@ use crate::queue_helpers::fetch_queue_item_data; use crate::rpc_pool::SolanaRpcPool; use crate::slot_tracker::SlotTracker; use crate::utils::get_protocol_config; +use account_compression::utils::constants::{ADDRESS_QUEUE_VALUES, STATE_NULLIFIER_QUEUE_VALUES}; pub use config::{ForesterConfig, ForesterEpochInfo}; use forester_utils::forester_epoch::{TreeAccounts, TreeType}; use forester_utils::indexer::Indexer; @@ -48,7 +49,13 @@ pub async fn run_queue_info( .collect(); for tree_data in trees { - let queue_length = fetch_queue_item_data(&mut rpc, &tree_data.queue) + let length = if tree_data.tree_type == TreeType::State { + STATE_NULLIFIER_QUEUE_VALUES + } else { + ADDRESS_QUEUE_VALUES + }; + + let queue_length = fetch_queue_item_data(&mut rpc, &tree_data.queue, 0, length, length) .await .unwrap() .len(); diff --git a/forester/src/queue_helpers.rs b/forester/src/queue_helpers.rs index 24507c9cf7..0cc2514b76 100644 --- a/forester/src/queue_helpers.rs +++ b/forester/src/queue_helpers.rs @@ -15,6 +15,9 @@ pub struct QueueItemData { pub async fn fetch_queue_item_data( rpc: &mut R, queue_pubkey: &Pubkey, + start_index: u16, + length: u16, + queue_length: u16, ) -> Result> { debug!("Fetching queue data for {:?}", queue_pubkey); let mut account = rpc @@ -24,17 +27,18 @@ pub async fn fetch_queue_item_data( let queue: HashSet = unsafe { HashSet::from_bytes_copy(&mut account.data[8 + mem::size_of::()..])? }; + let end_index = (start_index + length).min(queue_length); + let filtered_queue = queue .iter() - .filter_map(|(index, cell)| { - if cell.sequence_number.is_none() { - Some(QueueItemData { - hash: cell.value_bytes(), - index, - }) - } else { - None - } + .filter(|(index, cell)| { + *index >= start_index as usize + && *index < end_index as usize + && cell.sequence_number.is_none() + }) + .map(|(index, cell)| QueueItemData { + hash: cell.value_bytes(), + index, }) .collect(); debug!("Queue data fetched: {:?}", filtered_queue); diff --git a/forester/src/send_transaction.rs b/forester/src/send_transaction.rs index 054706015f..8ea510bf23 100644 --- a/forester/src/send_transaction.rs +++ b/forester/src/send_transaction.rs @@ -1,11 +1,12 @@ +use crate::config::QueueConfig; use crate::epoch_manager::{MerkleProofType, WorkItem}; use crate::errors::ForesterError; use crate::queue_helpers::fetch_queue_item_data; use crate::rpc_pool::SolanaRpcPool; use crate::Result; use account_compression::utils::constants::{ - ADDRESS_MERKLE_TREE_CHANGELOG, ADDRESS_MERKLE_TREE_INDEXED_CHANGELOG, - STATE_MERKLE_TREE_CHANGELOG, + ADDRESS_MERKLE_TREE_CHANGELOG, ADDRESS_MERKLE_TREE_INDEXED_CHANGELOG, ADDRESS_QUEUE_VALUES, + STATE_MERKLE_TREE_CHANGELOG, STATE_NULLIFIER_QUEUE_VALUES, }; use async_trait::async_trait; use forester_utils::forester_epoch::{TreeAccounts, TreeType}; @@ -93,7 +94,29 @@ pub async fn send_batched_transactions( while num_batches < config.num_batches && start_time.elapsed() < config.retry_config.timeout { debug!("Sending batch: {}", num_batches); // 2. Fetch queue items. - let queue_item_data = fetch_queue_item_data(&mut *rpc, &tree_accounts.queue).await?; + let queue_length = if tree_accounts.tree_type == TreeType::State { + STATE_NULLIFIER_QUEUE_VALUES + } else { + ADDRESS_QUEUE_VALUES + }; + let start_index = if tree_accounts.tree_type == TreeType::State { + config.queue_config.state_queue_start_index + } else { + config.queue_config.address_queue_start_index + }; + let length = if tree_accounts.tree_type == TreeType::State { + config.queue_config.state_queue_length + } else { + config.queue_config.address_queue_length + }; + let queue_item_data = fetch_queue_item_data( + &mut *rpc, + &tree_accounts.queue, + start_index, + length, + queue_length, + ) + .await?; let work_items: Vec = queue_item_data .into_iter() .map(|data| WorkItem { @@ -217,6 +240,7 @@ pub async fn send_batched_transactions( pub struct SendBatchedTransactionsConfig { pub num_batches: u64, pub build_transaction_batch_config: BuildTransactionBatchConfig, + pub queue_config: QueueConfig, pub retry_config: RetryConfig, pub light_slot_length: u64, } diff --git a/forester/src/settings.rs b/forester/src/settings.rs index b8a10312f1..20997b296c 100644 --- a/forester/src/settings.rs +++ b/forester/src/settings.rs @@ -1,4 +1,4 @@ -use crate::config::ExternalServicesConfig; +use crate::config::{ExternalServicesConfig, QueueConfig}; use crate::errors::ForesterError; use crate::ForesterConfig; use account_compression::initialize_address_merkle_tree::Pubkey; @@ -31,6 +31,10 @@ pub enum SettingsKey { CULimit, RpcPoolSize, SlotUpdateIntervalSeconds, + StateQueueStartIndex, + StateQueueLength, + AddressQueueStartIndex, + AddressQueueLength, } impl Display for SettingsKey { @@ -57,6 +61,10 @@ impl Display for SettingsKey { SettingsKey::CULimit => "CU_LIMIT", SettingsKey::RpcPoolSize => "RPC_POOL_SIZE", SettingsKey::SlotUpdateIntervalSeconds => "SLOT_UPDATE_INTERVAL_SECONDS", + SettingsKey::StateQueueStartIndex => "STATE_QUEUE_START_INDEX", + SettingsKey::StateQueueLength => "STATE_QUEUE_LENGTH", + SettingsKey::AddressQueueStartIndex => "ADDRESS_QUEUE_START_INDEX", + SettingsKey::AddressQueueLength => "ADDRESS_QUEUE_LENGTH", } ) } @@ -130,6 +138,18 @@ pub fn init_config(enable_metrics: bool) -> Result ForesterConfig { pushgateway_url: "http://localhost:9092/metrics/job/forester".to_string(), }, retry_config: Default::default(), + queue_config: Default::default(), registry_pubkey: light_registry::ID, payer_keypair: env_accounts.forester.insecure_clone(), indexer_batch_size: 50,