Skip to content

Commit

Permalink
Refactor epoch processing; add retry logic for registration.
Browse files Browse the repository at this point in the history
Consolidate repeated active phase work processing into a new function `process_epoch_work`. Introduce `register_for_epoch_with_retry` to handle registration retries with a specified maximum number of attempts and delay duration.
  • Loading branch information
sergeytimoshin committed Sep 6, 2024
1 parent 815b8e6 commit ec1cd25
Showing 1 changed file with 57 additions and 33 deletions.
90 changes: 57 additions & 33 deletions forester/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,48 +222,36 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
let previous_phases =
get_epoch_phases(&self.protocol_config, current_epoch.saturating_sub(1));

// Check if we're in the active phase of the previous epoch
// Process previous epoch if still in active phase
if slot >= previous_phases.active.start && slot < previous_phases.active.end {
info!(
"Currently in active phase of previous epoch {}. Processing remaining work.",
"Processing remaining work for previous epoch {}",
current_epoch - 1
);

match self.recover_registration_info(current_epoch - 1).await {
Ok(previous_registration_info) => {
self.perform_active_work(&previous_registration_info)
.await?;
}
Err(e) => {
warn!(
"Failed to recover registration info for previous epoch {}: {:?}",
current_epoch - 1,
e
);
}
}
self.process_epoch_work(current_epoch - 1).await?;
}

// Check if we're in the active phase of the current epoch
// Process current epoch
if slot >= current_phases.active.start && slot < current_phases.active.end {
info!(
"Currently in active phase of current epoch {}. Processing remaining work.",
current_epoch
);
info!("Processing work for current epoch {}", current_epoch);
self.process_epoch_work(current_epoch).await?;
}

match self.recover_registration_info(current_epoch).await {
Ok(current_registration_info) => {
self.perform_active_work(&current_registration_info).await?;
}
Err(e) => {
warn!(
"Failed to recover registration info for current epoch {}: {:?}",
current_epoch, e
);
}
Ok(())
}

async fn process_epoch_work(&self, epoch: u64) -> Result<()> {
match self.recover_registration_info(epoch).await {
Ok(registration_info) => {
self.perform_active_work(&registration_info).await?;
}
Err(e) => {
warn!(
"Failed to recover registration info for epoch {}: {:?}",
epoch, e
);
}
}

Ok(())
}

Expand All @@ -273,7 +261,9 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
info!("Entering process_epoch");

// Registration
let mut registration_info = self.register_for_epoch(epoch).await?;
let mut registration_info = self
.register_for_epoch_with_retry(epoch, 100, Duration::from_millis(200))
.await?;

// Wait for active phase
registration_info = self.wait_for_active_phase(&registration_info).await?;
Expand All @@ -299,6 +289,40 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
Ok((slot, self.protocol_config.get_current_epoch(slot)))
}

#[instrument(level = "debug", skip(self), fields(forester = %self.config.payer_keypair.pubkey(), epoch = epoch
))]
async fn register_for_epoch_with_retry(
&self,
epoch: u64,
max_retries: u32,
retry_delay: Duration,
) -> Result<ForesterEpochInfo> {
for attempt in 0..max_retries {
match self.register_for_epoch(epoch).await {
Ok(registration_info) => return Ok(registration_info),
Err(e) => {
warn!(
"Failed to register for epoch {} (attempt {}): {:?}",
epoch,
attempt + 1,
e
);
if attempt < max_retries - 1 {
sleep(retry_delay).await;
} else {
return Err(e);
}
}
}
}
Err(ForesterError::Custom(format!(
"Failed to register for epoch {} after {} attempts",
epoch, max_retries
)))
}

#[instrument(level = "debug", skip(self), fields(forester = %self.config.payer_keypair.pubkey(), epoch = epoch
))]
async fn register_for_epoch(&self, epoch: u64) -> Result<ForesterEpochInfo> {
info!("Registering for epoch: {}", epoch);
let mut rpc = self.rpc_pool.get_connection().await?;
Expand Down

0 comments on commit ec1cd25

Please sign in to comment.