Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: async rollover #1183

Merged
merged 3 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 47 additions & 24 deletions forester/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,17 +670,21 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
// TODO: sync at some point
let mut estimated_slot = self.slot_tracker.estimated_current_slot();

debug!(
"Estimated slot: {}, epoch end: {}",
estimated_slot, epoch_info.phases.active.end
);
while estimated_slot < epoch_info.phases.active.end {
debug!("Processing queue");
debug!("Searching for next eligible slot");
// search for next eligible slot
let index_and_forester_slot = tree
.slots
.iter()
.enumerate()
.find(|(_, slot)| slot.is_some());

debug!("Result: {:?}", index_and_forester_slot);
if let Some((index, forester_slot)) = index_and_forester_slot {
debug!("Found eligible slot");
let forester_slot = forester_slot.as_ref().unwrap().clone();
tree.slots.remove(index);

Expand Down Expand Up @@ -719,50 +723,69 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
phantom: std::marker::PhantomData::<R>,
};

debug!("Sending transactions...");
let start_time = Instant::now();
let num_tx_sent = send_batched_transactions(
let batch_tx_future = send_batched_transactions(
&self.config.payer_keypair,
self.rpc_pool.clone(),
config, // TODO: define config in epoch manager
tree.tree_accounts,
&transaction_builder,
epoch_pda.epoch,
);

// Check whether the tree is ready for rollover once per slot.
// Check in parallel with sending transactions.
if is_tree_ready_for_rollover(
&mut *rpc,
tree.tree_accounts.merkle_tree,
tree.tree_accounts.tree_type,
)
.await?
{
info!("Starting {} rollover.", tree.tree_accounts.merkle_tree);
self.perform_rollover(&tree.tree_accounts).await?;
}
// Await the result of the batch transactions after the
// potential rollover.
let num_tx_sent = num_tx_sent.await?;
// Prometheus metrics
let chunk_duration = start_time.elapsed();
if self.config.enable_metrics {
queue_metric_update(epoch_info.epoch, num_tx_sent, chunk_duration).await;
let future = self.rollover_if_needed(&tree.tree_accounts);

// Wait for both operations to complete
let (num_tx_sent, rollover_result) = tokio::join!(batch_tx_future, future);
rollover_result?;

match num_tx_sent {
Ok(num_tx_sent) => {
debug!("Transactions sent successfully");
let chunk_duration = start_time.elapsed();
queue_metric_update(epoch_info.epoch, num_tx_sent, chunk_duration).await;
self.increment_processed_items_count(epoch_info.epoch, num_tx_sent)
.await;
}
Err(e) => {
error!("Failed to send transactions: {:?}", e);
}
}
// TODO: consider do we really need WorkReport
self.increment_processed_items_count(epoch_info.epoch, num_tx_sent)
.await;
} else {
debug!("No eligible slot found");
// The forester is not eligible for any more slots in the current epoch
break;
}

if self.config.enable_metrics {
process_queued_metrics().await;
if let Err(e) = push_metrics(&self.config.external_services.pushgateway_url).await {
error!("Failed to push metrics: {:?}", e);
}
}

// Yield to allow other tasks to run
tokio::task::yield_now().await;

estimated_slot = self.slot_tracker.estimated_current_slot();

debug!(
"Estimated slot: {}, epoch end: {}",
estimated_slot, epoch_info.phases.active.end
);
}
Ok(())
}

async fn rollover_if_needed(&self, tree_account: &TreeAccounts) -> Result<()> {
let mut rpc = self.rpc_pool.get_connection().await?;
if is_tree_ready_for_rollover(&mut *rpc, tree_account.merkle_tree, tree_account.tree_type)
.await?
{
info!("Starting {} rollover.", tree_account.merkle_tree);
self.perform_rollover(tree_account).await?;
}
Ok(())
}
Expand Down
5 changes: 5 additions & 0 deletions forester/src/slot_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ pub async fn wait_until_slot_reached<R: RpcConnection>(
slot_duration()
};

debug!(
"Estimated slot: {}, waiting for {} seconds",
current_estimated_slot,
sleep_duration.as_secs_f64()
);
sleep(sleep_duration).await;
}

Expand Down