Skip to content

Commit

Permalink
Refactor queue fetching logic to support configurable indices (#1200)
Browse files Browse the repository at this point in the history
Enhanced fetch_queue_item_data to accept start index, length, and queue length parameters. Updated configuration to include queue settings and adjusted test cases accordingly.
  • Loading branch information
sergeytimoshin authored Sep 11, 2024
1 parent c96889a commit 38e4c46
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 25 deletions.
6 changes: 5 additions & 1 deletion forester/forester.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,8 @@ INDEXER_BATCH_SIZE=50
INDEXER_MAX_CONCURRENT_BATCHES=50
TRANSACTION_BATCH_SIZE=1
TRANSACTION_MAX_CONCURRENT_BATCHES=50
SLOT_UPDATE_INTERVAL_SECONDS=30
SLOT_UPDATE_INTERVAL_SECONDS=30
STATE_QUEUE_START_INDEX=0
STATE_QUEUE_LENGTH=28807
ADDRESS_QUEUE_START_INDEX=0
ADDRESS_QUEUE_LENGTH=28807
22 changes: 22 additions & 0 deletions forester/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use account_compression::utils::constants::{ADDRESS_QUEUE_VALUES, STATE_NULLIFIER_QUEUE_VALUES};
use forester_utils::forester_epoch::{Epoch, TreeAccounts, TreeForesterSchedule};
use forester_utils::rpc::RetryConfig;
use light_registry::{EpochPda, ForesterEpochPda};
Expand Down Expand Up @@ -40,6 +41,7 @@ impl ForesterEpochInfo {
pub struct ForesterConfig {
pub external_services: ExternalServicesConfig,
pub retry_config: RetryConfig,
pub queue_config: QueueConfig,
pub registry_pubkey: Pubkey,
pub payer_keypair: Keypair,
pub cu_limit: u32,
Expand All @@ -59,6 +61,7 @@ impl Clone for ForesterConfig {
Self {
external_services: self.external_services.clone(),
retry_config: self.retry_config,
queue_config: self.queue_config,
registry_pubkey: self.registry_pubkey,
payer_keypair: Keypair::from_bytes(&self.payer_keypair.to_bytes()).unwrap(),
cu_limit: self.cu_limit,
Expand All @@ -85,3 +88,22 @@ pub struct ExternalServicesConfig {
pub derivation: String,
pub pushgateway_url: String,
}

#[derive(Debug, Clone, Copy)]
pub struct QueueConfig {
pub state_queue_start_index: u16,
pub state_queue_length: u16,
pub address_queue_start_index: u16,
pub address_queue_length: u16,
}

impl Default for QueueConfig {
fn default() -> Self {
QueueConfig {
state_queue_start_index: 0,
state_queue_length: STATE_NULLIFIER_QUEUE_VALUES,
address_queue_start_index: 0,
address_queue_length: ADDRESS_QUEUE_VALUES,
}
}
}
6 changes: 4 additions & 2 deletions forester/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,17 +709,19 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {

// TODO: measure accuracy
// Optional replace with shutdown signal for all child processes
let config = SendBatchedTransactionsConfig {
let batched_tx_config = SendBatchedTransactionsConfig {
num_batches: 10,
build_transaction_batch_config: BuildTransactionBatchConfig {
batch_size: 50, // TODO: make batch size configurable and or dynamic based on queue usage
compute_unit_price: None, // Make dynamic based on queue usage
compute_unit_limit: Some(1_000_000),
},
queue_config: self.config.queue_config,
retry_config: RetryConfig {
timeout: light_slot_timeout,
..self.config.retry_config
},

light_slot_length: epoch_pda.protocol_config.slot_length,
};

Expand All @@ -734,7 +736,7 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
let batch_tx_future = send_batched_transactions(
&self.config.payer_keypair,
self.rpc_pool.clone(),
&config, // TODO: define config in epoch manager
&batched_tx_config, // TODO: define config in epoch manager
tree.tree_accounts,
&transaction_builder,
);
Expand Down
9 changes: 8 additions & 1 deletion forester/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::queue_helpers::fetch_queue_item_data;
use crate::rpc_pool::SolanaRpcPool;
use crate::slot_tracker::SlotTracker;
use crate::utils::get_protocol_config;
use account_compression::utils::constants::{ADDRESS_QUEUE_VALUES, STATE_NULLIFIER_QUEUE_VALUES};
pub use config::{ForesterConfig, ForesterEpochInfo};
use forester_utils::forester_epoch::{TreeAccounts, TreeType};
use forester_utils::indexer::Indexer;
Expand All @@ -48,7 +49,13 @@ pub async fn run_queue_info(
.collect();

for tree_data in trees {
let queue_length = fetch_queue_item_data(&mut rpc, &tree_data.queue)
let length = if tree_data.tree_type == TreeType::State {
STATE_NULLIFIER_QUEUE_VALUES
} else {
ADDRESS_QUEUE_VALUES
};

let queue_length = fetch_queue_item_data(&mut rpc, &tree_data.queue, 0, length, length)
.await
.unwrap()
.len();
Expand Down
22 changes: 13 additions & 9 deletions forester/src/queue_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ pub struct QueueItemData {
pub async fn fetch_queue_item_data<R: RpcConnection>(
rpc: &mut R,
queue_pubkey: &Pubkey,
start_index: u16,
length: u16,
queue_length: u16,
) -> Result<Vec<QueueItemData>> {
debug!("Fetching queue data for {:?}", queue_pubkey);
let mut account = rpc
Expand All @@ -24,17 +27,18 @@ pub async fn fetch_queue_item_data<R: RpcConnection>(
let queue: HashSet = unsafe {
HashSet::from_bytes_copy(&mut account.data[8 + mem::size_of::<QueueAccount>()..])?
};
let end_index = (start_index + length).min(queue_length);

let filtered_queue = queue
.iter()
.filter_map(|(index, cell)| {
if cell.sequence_number.is_none() {
Some(QueueItemData {
hash: cell.value_bytes(),
index,
})
} else {
None
}
.filter(|(index, cell)| {
*index >= start_index as usize
&& *index < end_index as usize
&& cell.sequence_number.is_none()
})
.map(|(index, cell)| QueueItemData {
hash: cell.value_bytes(),
index,
})
.collect();
debug!("Queue data fetched: {:?}", filtered_queue);
Expand Down
30 changes: 27 additions & 3 deletions forester/src/send_transaction.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::config::QueueConfig;
use crate::epoch_manager::{MerkleProofType, WorkItem};
use crate::errors::ForesterError;
use crate::queue_helpers::fetch_queue_item_data;
use crate::rpc_pool::SolanaRpcPool;
use crate::Result;
use account_compression::utils::constants::{
ADDRESS_MERKLE_TREE_CHANGELOG, ADDRESS_MERKLE_TREE_INDEXED_CHANGELOG,
STATE_MERKLE_TREE_CHANGELOG,
ADDRESS_MERKLE_TREE_CHANGELOG, ADDRESS_MERKLE_TREE_INDEXED_CHANGELOG, ADDRESS_QUEUE_VALUES,
STATE_MERKLE_TREE_CHANGELOG, STATE_NULLIFIER_QUEUE_VALUES,
};
use async_trait::async_trait;
use forester_utils::forester_epoch::{TreeAccounts, TreeType};
Expand Down Expand Up @@ -93,7 +94,29 @@ pub async fn send_batched_transactions<T: TransactionBuilder, R: RpcConnection>(
while num_batches < config.num_batches && start_time.elapsed() < config.retry_config.timeout {
debug!("Sending batch: {}", num_batches);
// 2. Fetch queue items.
let queue_item_data = fetch_queue_item_data(&mut *rpc, &tree_accounts.queue).await?;
let queue_length = if tree_accounts.tree_type == TreeType::State {
STATE_NULLIFIER_QUEUE_VALUES
} else {
ADDRESS_QUEUE_VALUES
};
let start_index = if tree_accounts.tree_type == TreeType::State {
config.queue_config.state_queue_start_index
} else {
config.queue_config.address_queue_start_index
};
let length = if tree_accounts.tree_type == TreeType::State {
config.queue_config.state_queue_length
} else {
config.queue_config.address_queue_length
};
let queue_item_data = fetch_queue_item_data(
&mut *rpc,
&tree_accounts.queue,
start_index,
length,
queue_length,
)
.await?;
let work_items: Vec<WorkItem> = queue_item_data
.into_iter()
.map(|data| WorkItem {
Expand Down Expand Up @@ -217,6 +240,7 @@ pub async fn send_batched_transactions<T: TransactionBuilder, R: RpcConnection>(
pub struct SendBatchedTransactionsConfig {
pub num_batches: u64,
pub build_transaction_batch_config: BuildTransactionBatchConfig,
pub queue_config: QueueConfig,
pub retry_config: RetryConfig,
pub light_slot_length: u64,
}
Expand Down
22 changes: 21 additions & 1 deletion forester/src/settings.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::config::ExternalServicesConfig;
use crate::config::{ExternalServicesConfig, QueueConfig};
use crate::errors::ForesterError;
use crate::ForesterConfig;
use account_compression::initialize_address_merkle_tree::Pubkey;
Expand Down Expand Up @@ -31,6 +31,10 @@ pub enum SettingsKey {
CULimit,
RpcPoolSize,
SlotUpdateIntervalSeconds,
StateQueueStartIndex,
StateQueueLength,
AddressQueueStartIndex,
AddressQueueLength,
}

impl Display for SettingsKey {
Expand All @@ -57,6 +61,10 @@ impl Display for SettingsKey {
SettingsKey::CULimit => "CU_LIMIT",
SettingsKey::RpcPoolSize => "RPC_POOL_SIZE",
SettingsKey::SlotUpdateIntervalSeconds => "SLOT_UPDATE_INTERVAL_SECONDS",
SettingsKey::StateQueueStartIndex => "STATE_QUEUE_START_INDEX",
SettingsKey::StateQueueLength => "STATE_QUEUE_LENGTH",
SettingsKey::AddressQueueStartIndex => "ADDRESS_QUEUE_START_INDEX",
SettingsKey::AddressQueueLength => "ADDRESS_QUEUE_LENGTH",
}
)
}
Expand Down Expand Up @@ -130,6 +138,18 @@ pub fn init_config(enable_metrics: bool) -> Result<ForesterConfig, ForesterError
settings.get_int(&SettingsKey::Timeout.to_string())? as u64
),
},
queue_config: QueueConfig {
state_queue_start_index: settings
.get_int(&SettingsKey::StateQueueStartIndex.to_string())?
as u16,
state_queue_length: settings.get_int(&SettingsKey::StateQueueLength.to_string())?
as u16,
address_queue_start_index: settings
.get_int(&SettingsKey::AddressQueueStartIndex.to_string())?
as u16,
address_queue_length: settings.get_int(&SettingsKey::AddressQueueLength.to_string())?
as u16,
},
registry_pubkey: Pubkey::from_str(&registry_pubkey)
.map_err(|e| ForesterError::ConfigError(e.to_string()))?,
payer_keypair: payer,
Expand Down
29 changes: 21 additions & 8 deletions forester/tests/e2e_test.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use account_compression::utils::constants::{ADDRESS_QUEUE_VALUES, STATE_NULLIFIER_QUEUE_VALUES};
use account_compression::AddressMerkleTreeAccount;
use forester::queue_helpers::fetch_queue_item_data;
use forester::rpc_pool::SolanaRpcPool;
Expand Down Expand Up @@ -199,10 +200,16 @@ pub async fn assert_queue_len(
) {
for tree in state_trees.iter() {
let mut rpc = pool.get_connection().await.unwrap();
let queue_length = fetch_queue_item_data(&mut *rpc, &tree.nullifier_queue)
.await
.unwrap()
.len();
let queue_length = fetch_queue_item_data(
&mut *rpc,
&tree.nullifier_queue,
0,
STATE_NULLIFIER_QUEUE_VALUES,
STATE_NULLIFIER_QUEUE_VALUES,
)
.await
.unwrap()
.len();
if not_empty {
assert_ne!(queue_length, 0);
} else {
Expand All @@ -213,10 +220,16 @@ pub async fn assert_queue_len(

for tree in address_trees.iter() {
let mut rpc = pool.get_connection().await.unwrap();
let queue_length = fetch_queue_item_data(&mut *rpc, &tree.queue)
.await
.unwrap()
.len();
let queue_length = fetch_queue_item_data(
&mut *rpc,
&tree.queue,
0,
ADDRESS_QUEUE_VALUES,
ADDRESS_QUEUE_VALUES,
)
.await
.unwrap()
.len();
if not_empty {
assert_ne!(queue_length, 0);
} else {
Expand Down
1 change: 1 addition & 0 deletions forester/tests/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ pub fn forester_config() -> ForesterConfig {
pushgateway_url: "http://localhost:9092/metrics/job/forester".to_string(),
},
retry_config: Default::default(),
queue_config: Default::default(),
registry_pubkey: light_registry::ID,
payer_keypair: env_accounts.forester.insecure_clone(),
indexer_batch_size: 50,
Expand Down

0 comments on commit 38e4c46

Please sign in to comment.