diff --git a/forester/src/epoch_manager.rs b/forester/src/epoch_manager.rs index 1b9bdf8366..54e7963e37 100644 --- a/forester/src/epoch_manager.rs +++ b/forester/src/epoch_manager.rs @@ -20,6 +20,7 @@ use forester_utils::forester_epoch::{ }; use forester_utils::indexer::{Indexer, MerkleProof, NewAddressProofWithContext}; use forester_utils::rpc::RpcConnection; +use futures::future::join_all; use light_registry::protocol_config::state::ProtocolConfig; use light_registry::sdk::{ create_finalize_registration_instruction, create_report_work_instruction, @@ -33,6 +34,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; use tokio::sync::{mpsc, oneshot, Mutex}; +use tokio::task::JoinHandle; use tokio::time::{sleep, Instant}; use tracing::{debug, error, info, info_span, instrument, warn}; @@ -456,26 +458,28 @@ impl> EpochManager { let current_slot = rpc.get_slot().await?; self.slot_tracker.update(current_slot); } + + let self_arc = Arc::new(self.clone()); + let epoch_info_arc = Arc::new(epoch_info.clone()); + + let mut handles: Vec>> = Vec::new(); + for tree in epoch_info.trees.iter() { info!("Creating thread for queue {}", tree.tree_accounts.queue); - // TODO: inefficient try to only clone necessary fields - let epoch_info_clone = epoch_info.clone(); - let self_clone = self.clone(); + let self_clone = self_arc.clone(); + let epoch_info_clone = epoch_info_arc.clone(); let tree = tree.clone(); - // TODO: consider passing global shutdown signal (might be overkill - // since we have timeouts) - tokio::spawn(async move { - if let Err(e) = self_clone + let handle = tokio::spawn(async move { + self_clone .process_queue( - epoch_info_clone.epoch, // TODO: only clone the necessary fields - epoch_info_clone.forester_epoch_pda.clone(), + &epoch_info_clone.epoch, + &epoch_info_clone.forester_epoch_pda, tree, ) .await - { - error!("Error processing queue: {:?}", e); - } }); + + handles.push(handle); } info!("Threads created. Waiting for active phase to end"); @@ -488,6 +492,16 @@ impl> EpochManager { estimated_slot, active_phase_end ); + // Wait for all tasks to complete + let results = join_all(handles).await; + for result in results { + match result { + Ok(Ok(())) => {} + Ok(Err(e)) => error!("Error processing queue: {:?}", e), + Err(e) => error!("Task panicked: {:?}", e), + } + } + info!("Completed active work"); Ok(()) } @@ -500,8 +514,8 @@ impl> EpochManager { )] pub async fn process_queue( &self, - epoch_info: Epoch, - epoch_pda: ForesterEpochPda, + epoch_info: &Epoch, + epoch_pda: &ForesterEpochPda, mut tree: TreeForesterSchedule, ) -> Result<()> { debug!("enter process_queue");