Skip to content

Commit

Permalink
[stable2407] Backport #6807 (#7155)
Browse files Browse the repository at this point in the history
Backport #6807 into `stable2407` from alexggh.

See the
[documentation](https://github.com/paritytech/polkadot-sdk/blob/master/docs/BACKPORT.md)
on how to use this bot.

<!--
  # To be used by other automation, do not modify:
  original-pr-number: #${pull_number}
-->

---------

Signed-off-by: Alexandru Gheorghe <alexandru.gheorghe@parity.io>
Co-authored-by: Alexandru Gheorghe <49718502+alexggh@users.noreply.github.com>
Co-authored-by: Alexandru Gheorghe <alexandru.gheorghe@parity.io>
  • Loading branch information
3 people authored Jan 15, 2025
1 parent 83db0a1 commit c983de5
Show file tree
Hide file tree
Showing 4 changed files with 303 additions and 8 deletions.
141 changes: 134 additions & 7 deletions polkadot/node/core/approval-voting/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
//! of others. It uses this information to determine when candidates and blocks have
//! been sufficiently approved to finalize.
use futures_timer::Delay;
use itertools::Itertools;
use jaeger::{hash_to_trace_identifier, PerLeafSpan};
use polkadot_node_jaeger as jaeger;
Expand Down Expand Up @@ -124,6 +125,9 @@ const WAIT_FOR_SIGS_TIMEOUT: Duration = Duration::from_millis(500);
const APPROVAL_CACHE_SIZE: u32 = 1024;

const TICK_TOO_FAR_IN_FUTURE: Tick = 20; // 10 seconds.
/// The maximum number of times we retry to approve a block if is still needed.
const MAX_APPROVAL_RETRIES: u32 = 16;

const APPROVAL_DELAY: Tick = 2;
pub(crate) const LOG_TARGET: &str = "parachain::approval-voting";

Expand Down Expand Up @@ -166,6 +170,10 @@ pub struct ApprovalVotingSubsystem {
mode: Mode,
metrics: Metrics,
clock: Box<dyn Clock + Send + Sync>,
/// The maximum time we retry to approve a block if it is still needed and PoV fetch failed.
max_approval_retries: u32,
/// The backoff before we retry the approval.
retry_backoff: Duration,
}

#[derive(Clone)]
Expand Down Expand Up @@ -492,6 +500,8 @@ impl ApprovalVotingSubsystem {
sync_oracle,
metrics,
Box::new(SystemClock {}),
MAX_APPROVAL_RETRIES,
APPROVAL_CHECKING_TIMEOUT / 2,
)
}

Expand All @@ -503,6 +513,8 @@ impl ApprovalVotingSubsystem {
sync_oracle: Box<dyn SyncOracle + Send>,
metrics: Metrics,
clock: Box<dyn Clock + Send + Sync>,
max_approval_retries: u32,
retry_backoff: Duration,
) -> Self {
ApprovalVotingSubsystem {
keystore,
Expand All @@ -512,6 +524,8 @@ impl ApprovalVotingSubsystem {
mode: Mode::Syncing(sync_oracle),
metrics,
clock,
max_approval_retries,
retry_backoff,
}
}

Expand Down Expand Up @@ -701,18 +715,53 @@ enum ApprovalOutcome {
TimedOut,
}

#[derive(Clone)]
struct RetryApprovalInfo {
candidate: CandidateReceipt,
backing_group: GroupIndex,
executor_params: ExecutorParams,
core_index: Option<CoreIndex>,
session_index: SessionIndex,
attempts_remaining: u32,
backoff: Duration,
}

struct ApprovalState {
validator_index: ValidatorIndex,
candidate_hash: CandidateHash,
approval_outcome: ApprovalOutcome,
retry_info: Option<RetryApprovalInfo>,
}

impl ApprovalState {
fn approved(validator_index: ValidatorIndex, candidate_hash: CandidateHash) -> Self {
Self { validator_index, candidate_hash, approval_outcome: ApprovalOutcome::Approved }
Self {
validator_index,
candidate_hash,
approval_outcome: ApprovalOutcome::Approved,
retry_info: None,
}
}
fn failed(validator_index: ValidatorIndex, candidate_hash: CandidateHash) -> Self {
Self { validator_index, candidate_hash, approval_outcome: ApprovalOutcome::Failed }
Self {
validator_index,
candidate_hash,
approval_outcome: ApprovalOutcome::Failed,
retry_info: None,
}
}

fn failed_with_retry(
validator_index: ValidatorIndex,
candidate_hash: CandidateHash,
retry_info: Option<RetryApprovalInfo>,
) -> Self {
Self {
validator_index,
candidate_hash,
approval_outcome: ApprovalOutcome::Failed,
retry_info,
}
}
}

Expand Down Expand Up @@ -752,6 +801,7 @@ impl CurrentlyCheckingSet {
candidate_hash,
validator_index,
approval_outcome: ApprovalOutcome::TimedOut,
retry_info: None,
},
Some(approval_state) => approval_state,
}
Expand Down Expand Up @@ -1236,25 +1286,69 @@ where
validator_index,
candidate_hash,
approval_outcome,
retry_info,
}
) = approval_state;

if matches!(approval_outcome, ApprovalOutcome::Approved) {
let mut approvals: Vec<Action> = relay_block_hashes
.into_iter()
.iter()
.map(|block_hash|
Action::IssueApproval(
candidate_hash,
ApprovalVoteRequest {
validator_index,
block_hash,
block_hash: *block_hash,
},
)
)
.collect();
actions.append(&mut approvals);
}

if let Some(retry_info) = retry_info {
for block_hash in relay_block_hashes {
if overlayed_db.load_block_entry(&block_hash).map(|block_info| block_info.is_some()).unwrap_or(false) {
let ctx = &mut ctx;
let metrics = subsystem.metrics.clone();
let retry_info = retry_info.clone();
let executor_params = retry_info.executor_params.clone();
let candidate = retry_info.candidate.clone();
let launch_approval_span = state
.spans
.get(&block_hash)
.map(|span| span.child("launch-approval"))
.unwrap_or_else(|| jaeger::Span::new(candidate_hash, "launch-approval"))
.with_trace_id(candidate_hash)
.with_candidate(candidate_hash)
.with_stage(jaeger::Stage::ApprovalChecking);
currently_checking_set
.insert_relay_block_hash(
candidate_hash,
validator_index,
block_hash,
async move {
launch_approval(
ctx,
metrics,
retry_info.session_index,
candidate,
validator_index,
block_hash,
retry_info.backing_group,
executor_params,
retry_info.core_index,
&launch_approval_span,
retry_info,
)
.await
},
)
.await?;
}
}
}

actions
},
(block_hash, validator_index) = delayed_approvals_timers.select_next_some() => {
Expand Down Expand Up @@ -1302,6 +1396,8 @@ where
&mut approvals_cache,
&mut subsystem.mode,
actions,
subsystem.max_approval_retries,
subsystem.retry_backoff,
)
.await?
{
Expand Down Expand Up @@ -1350,6 +1446,8 @@ async fn handle_actions<Context>(
approvals_cache: &mut LruMap<CandidateHash, ApprovalOutcome>,
mode: &mut Mode,
actions: Vec<Action>,
max_approval_retries: u32,
retry_backoff: Duration,
) -> SubsystemResult<bool> {
let mut conclude = false;
let mut actions_iter = actions.into_iter();
Expand Down Expand Up @@ -1442,6 +1540,16 @@ async fn handle_actions<Context>(
None => {
let ctx = &mut *ctx;

let retry = RetryApprovalInfo {
candidate: candidate.clone(),
backing_group,
executor_params: executor_params.clone(),
core_index,
session_index: session,
attempts_remaining: max_approval_retries,
backoff: retry_backoff,
};

currently_checking_set
.insert_relay_block_hash(
candidate_hash,
Expand All @@ -1459,6 +1567,7 @@ async fn handle_actions<Context>(
executor_params,
core_index,
&launch_approval_span,
retry,
)
.await
},
Expand Down Expand Up @@ -3389,6 +3498,7 @@ async fn launch_approval<Context>(
executor_params: ExecutorParams,
core_index: Option<CoreIndex>,
span: &jaeger::Span,
retry: RetryApprovalInfo,
) -> SubsystemResult<RemoteHandle<ApprovalState>> {
let (a_tx, a_rx) = oneshot::channel();
let (code_tx, code_rx) = oneshot::channel();
Expand Down Expand Up @@ -3420,6 +3530,7 @@ async fn launch_approval<Context>(

let candidate_hash = candidate.hash();
let para_id = candidate.descriptor.para_id;
let mut next_retry = None;
gum::trace!(target: LOG_TARGET, ?candidate_hash, ?para_id, "Recovering data.");

let request_validation_data_span = span
Expand Down Expand Up @@ -3458,7 +3569,6 @@ async fn launch_approval<Context>(
let background = async move {
// Force the move of the timer into the background task.
let _timer = timer;

let available_data = match a_rx.await {
Err(_) => return ApprovalState::failed(validator_index, candidate_hash),
Ok(Ok(a)) => a,
Expand All @@ -3469,10 +3579,27 @@ async fn launch_approval<Context>(
target: LOG_TARGET,
?para_id,
?candidate_hash,
attempts_remaining = retry.attempts_remaining,
"Data unavailable for candidate {:?}",
(candidate_hash, candidate.descriptor.para_id),
);
// do nothing. we'll just be a no-show and that'll cause others to rise up.
// Availability could fail if we did not discover much of the network, so
// let's back off and order the subsystem to retry at a later point if the
// approval is still needed, because no-show wasn't covered yet.
if retry.attempts_remaining > 0 {
Delay::new(retry.backoff).await;
next_retry = Some(RetryApprovalInfo {
candidate,
backing_group,
executor_params,
core_index,
session_index,
attempts_remaining: retry.attempts_remaining - 1,
backoff: retry.backoff,
});
} else {
next_retry = None;
}
metrics_guard.take().on_approval_unavailable();
},
&RecoveryError::ChannelClosed => {
Expand Down Expand Up @@ -3503,7 +3630,7 @@ async fn launch_approval<Context>(
metrics_guard.take().on_approval_invalid();
},
}
return ApprovalState::failed(validator_index, candidate_hash)
return ApprovalState::failed_with_retry(validator_index, candidate_hash, next_retry)
},
};
drop(request_validation_data_span);
Expand Down
Loading

0 comments on commit c983de5

Please sign in to comment.