From ec1cd25080a35299ebfd349a5ad38356a46720c5 Mon Sep 17 00:00:00 2001 From: Sergey Timoshin Date: Fri, 6 Sep 2024 23:38:38 +0700 Subject: [PATCH] Refactor epoch processing; add retry logic for registration. Consolidate repeated active phase work processing into a new function `process_epoch_work`. Introduce `register_for_epoch_with_retry` to handle registration retries with a specified maximum number of attempts and delay duration. --- forester/src/epoch_manager.rs | 90 ++++++++++++++++++++++------------- 1 file changed, 57 insertions(+), 33 deletions(-) diff --git a/forester/src/epoch_manager.rs b/forester/src/epoch_manager.rs index d592342b55..0212782f80 100644 --- a/forester/src/epoch_manager.rs +++ b/forester/src/epoch_manager.rs @@ -222,48 +222,36 @@ impl> EpochManager { let previous_phases = get_epoch_phases(&self.protocol_config, current_epoch.saturating_sub(1)); - // Check if we're in the active phase of the previous epoch + // Process previous epoch if still in active phase if slot >= previous_phases.active.start && slot < previous_phases.active.end { info!( - "Currently in active phase of previous epoch {}. Processing remaining work.", + "Processing remaining work for previous epoch {}", current_epoch - 1 ); - - match self.recover_registration_info(current_epoch - 1).await { - Ok(previous_registration_info) => { - self.perform_active_work(&previous_registration_info) - .await?; - } - Err(e) => { - warn!( - "Failed to recover registration info for previous epoch {}: {:?}", - current_epoch - 1, - e - ); - } - } + self.process_epoch_work(current_epoch - 1).await?; } - // Check if we're in the active phase of the current epoch + // Process current epoch if slot >= current_phases.active.start && slot < current_phases.active.end { - info!( - "Currently in active phase of current epoch {}. Processing remaining work.", - current_epoch - ); + info!("Processing work for current epoch {}", current_epoch); + self.process_epoch_work(current_epoch).await?; + } - match self.recover_registration_info(current_epoch).await { - Ok(current_registration_info) => { - self.perform_active_work(¤t_registration_info).await?; - } - Err(e) => { - warn!( - "Failed to recover registration info for current epoch {}: {:?}", - current_epoch, e - ); - } + Ok(()) + } + + async fn process_epoch_work(&self, epoch: u64) -> Result<()> { + match self.recover_registration_info(epoch).await { + Ok(registration_info) => { + self.perform_active_work(®istration_info).await?; + } + Err(e) => { + warn!( + "Failed to recover registration info for epoch {}: {:?}", + epoch, e + ); } } - Ok(()) } @@ -273,7 +261,9 @@ impl> EpochManager { info!("Entering process_epoch"); // Registration - let mut registration_info = self.register_for_epoch(epoch).await?; + let mut registration_info = self + .register_for_epoch_with_retry(epoch, 100, Duration::from_millis(200)) + .await?; // Wait for active phase registration_info = self.wait_for_active_phase(®istration_info).await?; @@ -299,6 +289,40 @@ impl> EpochManager { Ok((slot, self.protocol_config.get_current_epoch(slot))) } + #[instrument(level = "debug", skip(self), fields(forester = %self.config.payer_keypair.pubkey(), epoch = epoch + ))] + async fn register_for_epoch_with_retry( + &self, + epoch: u64, + max_retries: u32, + retry_delay: Duration, + ) -> Result { + for attempt in 0..max_retries { + match self.register_for_epoch(epoch).await { + Ok(registration_info) => return Ok(registration_info), + Err(e) => { + warn!( + "Failed to register for epoch {} (attempt {}): {:?}", + epoch, + attempt + 1, + e + ); + if attempt < max_retries - 1 { + sleep(retry_delay).await; + } else { + return Err(e); + } + } + } + } + Err(ForesterError::Custom(format!( + "Failed to register for epoch {} after {} attempts", + epoch, max_retries + ))) + } + + #[instrument(level = "debug", skip(self), fields(forester = %self.config.payer_keypair.pubkey(), epoch = epoch + ))] async fn register_for_epoch(&self, epoch: u64) -> Result { info!("Registering for epoch: {}", epoch); let mut rpc = self.rpc_pool.get_connection().await?;