Skip to content

Commit

Permalink
refactor: async rollover (#1183)
Browse files Browse the repository at this point in the history
* Add refactor rollover logic

Refactored the rollover check into a separate asynchronous function to improve concurrency in transaction processing.

* Refactor variable name for clarity

* Optimize epoch manager code alignment
  • Loading branch information
sergeytimoshin authored Sep 9, 2024
1 parent 941e766 commit 4aef62d
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 24 deletions.
71 changes: 47 additions & 24 deletions forester/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,17 +670,21 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
// TODO: sync at some point
let mut estimated_slot = self.slot_tracker.estimated_current_slot();

debug!(
"Estimated slot: {}, epoch end: {}",
estimated_slot, epoch_info.phases.active.end
);
while estimated_slot < epoch_info.phases.active.end {
debug!("Processing queue");
debug!("Searching for next eligible slot");
// search for next eligible slot
let index_and_forester_slot = tree
.slots
.iter()
.enumerate()
.find(|(_, slot)| slot.is_some());

debug!("Result: {:?}", index_and_forester_slot);
if let Some((index, forester_slot)) = index_and_forester_slot {
debug!("Found eligible slot");
let forester_slot = forester_slot.as_ref().unwrap().clone();
tree.slots.remove(index);

Expand Down Expand Up @@ -719,50 +723,69 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
phantom: std::marker::PhantomData::<R>,
};

debug!("Sending transactions...");
let start_time = Instant::now();
let num_tx_sent = send_batched_transactions(
let batch_tx_future = send_batched_transactions(
&self.config.payer_keypair,
self.rpc_pool.clone(),
config, // TODO: define config in epoch manager
tree.tree_accounts,
&transaction_builder,
epoch_pda.epoch,
);

// Check whether the tree is ready for rollover once per slot.
// Check in parallel with sending transactions.
if is_tree_ready_for_rollover(
&mut *rpc,
tree.tree_accounts.merkle_tree,
tree.tree_accounts.tree_type,
)
.await?
{
info!("Starting {} rollover.", tree.tree_accounts.merkle_tree);
self.perform_rollover(&tree.tree_accounts).await?;
}
// Await the result of the batch transactions after the
// potential rollover.
let num_tx_sent = num_tx_sent.await?;
// Prometheus metrics
let chunk_duration = start_time.elapsed();
if self.config.enable_metrics {
queue_metric_update(epoch_info.epoch, num_tx_sent, chunk_duration).await;
let future = self.rollover_if_needed(&tree.tree_accounts);

// Wait for both operations to complete
let (num_tx_sent, rollover_result) = tokio::join!(batch_tx_future, future);
rollover_result?;

match num_tx_sent {
Ok(num_tx_sent) => {
debug!("Transactions sent successfully");
let chunk_duration = start_time.elapsed();
queue_metric_update(epoch_info.epoch, num_tx_sent, chunk_duration).await;
self.increment_processed_items_count(epoch_info.epoch, num_tx_sent)
.await;
}
Err(e) => {
error!("Failed to send transactions: {:?}", e);
}
}
// TODO: consider do we really need WorkReport
self.increment_processed_items_count(epoch_info.epoch, num_tx_sent)
.await;
} else {
debug!("No eligible slot found");
// The forester is not eligible for any more slots in the current epoch
break;
}

if self.config.enable_metrics {
process_queued_metrics().await;
if let Err(e) = push_metrics(&self.config.external_services.pushgateway_url).await {
error!("Failed to push metrics: {:?}", e);
}
}

// Yield to allow other tasks to run
tokio::task::yield_now().await;

estimated_slot = self.slot_tracker.estimated_current_slot();

debug!(
"Estimated slot: {}, epoch end: {}",
estimated_slot, epoch_info.phases.active.end
);
}
Ok(())
}

async fn rollover_if_needed(&self, tree_account: &TreeAccounts) -> Result<()> {
let mut rpc = self.rpc_pool.get_connection().await?;
if is_tree_ready_for_rollover(&mut *rpc, tree_account.merkle_tree, tree_account.tree_type)
.await?
{
info!("Starting {} rollover.", tree_account.merkle_tree);
self.perform_rollover(tree_account).await?;
}
Ok(())
}
Expand Down
5 changes: 5 additions & 0 deletions forester/src/slot_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ pub async fn wait_until_slot_reached<R: RpcConnection>(
slot_duration()
};

debug!(
"Estimated slot: {}, waiting for {} seconds",
current_estimated_slot,
sleep_duration.as_secs_f64()
);
sleep(sleep_duration).await;
}

Expand Down

0 comments on commit 4aef62d

Please sign in to comment.