Skip to content

Commit

Permalink
refactor: forester: use handles when spawning queues (#1148)
Browse files Browse the repository at this point in the history
* Refactor epoch processing to use task handles

Replace individual task spawning with a vector of `JoinHandle` to manage concurrent tasks more efficiently. Use `join_all` to wait for all tasks to finish and handle errors appropriately.

* Refactor import statements

Rearranged the import of `join_all` for better organization and removed an unnecessary comma.

* Remove redundant clone call in process_queue
  • Loading branch information
sergeytimoshin authored Sep 3, 2024
1 parent 284133b commit 77e5760
Showing 1 changed file with 28 additions and 14 deletions.
42 changes: 28 additions & 14 deletions forester/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use forester_utils::forester_epoch::{
};
use forester_utils::indexer::{Indexer, MerkleProof, NewAddressProofWithContext};
use forester_utils::rpc::RpcConnection;
use futures::future::join_all;
use light_registry::protocol_config::state::ProtocolConfig;
use light_registry::sdk::{
create_finalize_registration_instruction, create_report_work_instruction,
Expand All @@ -33,6 +34,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio::task::JoinHandle;
use tokio::time::{sleep, Instant};
use tracing::{debug, error, info, info_span, instrument, warn};

Expand Down Expand Up @@ -456,26 +458,28 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
let current_slot = rpc.get_slot().await?;
self.slot_tracker.update(current_slot);
}

let self_arc = Arc::new(self.clone());
let epoch_info_arc = Arc::new(epoch_info.clone());

let mut handles: Vec<JoinHandle<Result<()>>> = Vec::new();

for tree in epoch_info.trees.iter() {
info!("Creating thread for queue {}", tree.tree_accounts.queue);
// TODO: inefficient try to only clone necessary fields
let epoch_info_clone = epoch_info.clone();
let self_clone = self.clone();
let self_clone = self_arc.clone();
let epoch_info_clone = epoch_info_arc.clone();
let tree = tree.clone();
// TODO: consider passing global shutdown signal (might be overkill
// since we have timeouts)
tokio::spawn(async move {
if let Err(e) = self_clone
let handle = tokio::spawn(async move {
self_clone
.process_queue(
epoch_info_clone.epoch, // TODO: only clone the necessary fields
epoch_info_clone.forester_epoch_pda.clone(),
&epoch_info_clone.epoch,
&epoch_info_clone.forester_epoch_pda,
tree,
)
.await
{
error!("Error processing queue: {:?}", e);
}
});

handles.push(handle);
}

info!("Threads created. Waiting for active phase to end");
Expand All @@ -488,6 +492,16 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
estimated_slot, active_phase_end
);

// Wait for all tasks to complete
let results = join_all(handles).await;
for result in results {
match result {
Ok(Ok(())) => {}
Ok(Err(e)) => error!("Error processing queue: {:?}", e),
Err(e) => error!("Task panicked: {:?}", e),
}
}

info!("Completed active work");
Ok(())
}
Expand All @@ -500,8 +514,8 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
)]
pub async fn process_queue(
&self,
epoch_info: Epoch,
epoch_pda: ForesterEpochPda,
epoch_info: &Epoch,
epoch_pda: &ForesterEpochPda,
mut tree: TreeForesterSchedule,
) -> Result<()> {
debug!("enter process_queue");
Expand Down

0 comments on commit 77e5760

Please sign in to comment.