Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Dispute coordinator - Recover disputes on startup #3481

Merged
152 changes: 141 additions & 11 deletions node/core/dispute-coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use polkadot_node_subsystem::{
errors::{ChainApiError, RuntimeApiError},
messages::{
ChainApiMessage, DisputeCoordinatorMessage, DisputeDistributionMessage,
DisputeParticipationMessage, ImportStatementsResult
DisputeParticipationMessage, ImportStatementsResult,
}
};
use polkadot_node_subsystem_util::rolling_session_window::{
Expand Down Expand Up @@ -71,10 +71,27 @@ const ACTIVE_DURATION_SECS: Timestamp = 180;
/// Timestamp based on the 1 Jan 1970 UNIX base, which is persistent across node restarts and OS reboots.
type Timestamp = u64;

#[derive(Eq, PartialEq)]
enum Recovery {
rphmeier marked this conversation as resolved.
Show resolved Hide resolved
Pending,
Complete,
}

impl Recovery {
fn complete(&mut self) -> bool {
let complete = *self == Recovery::Complete;
if !complete {
*self = Recovery::Complete
}
complete
}
}

struct State {
keystore: Arc<LocalKeystore>,
highest_session: Option<SessionIndex>,
rolling_session_window: RollingSessionWindow,
recovery_state: Recovery,
}

/// Configuration for the dispute coordinator subsystem.
Expand Down Expand Up @@ -277,7 +294,7 @@ where
B: Backend,
{
loop {
let res = run_iteration(&mut ctx, &subsystem, &mut backend, &*clock).await;
let res = run_until_error(&mut ctx, &subsystem, &mut backend, &*clock).await;
match res {
Err(e) => {
e.trace();
Expand All @@ -299,7 +316,7 @@ where
//
// A return value of `Ok` indicates that an exit should be made, while non-fatal errors
// lead to another call to this function.
async fn run_iteration<B, Context>(
async fn run_until_error<B, Context>(
ctx: &mut Context,
subsystem: &DisputeCoordinatorSubsystem,
backend: &mut B,
Expand All @@ -314,6 +331,7 @@ where
keystore: subsystem.keystore.clone(),
highest_session: None,
rolling_session_window: RollingSessionWindow::new(DISPUTE_WINDOW),
recovery_state: Recovery::Pending,
};

loop {
Expand All @@ -323,12 +341,22 @@ where
return Ok(())
}
FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => {
handle_new_activations(
ctx,
&mut overlay_db,
&mut state,
update.activated.into_iter().map(|a| a.hash),
).await?
let leaves = update.activated.into_iter().map(|a| a.hash);
if !state.recovery_state.complete() {
handle_leaf(
rphmeier marked this conversation as resolved.
Show resolved Hide resolved
ctx,
&mut overlay_db,
&mut state,
leaves,
).await?;
} else {
handle_new_activations(
ctx,
&mut overlay_db,
&mut state,
leaves,
).await?;
}
}
FromOverseer::Signal(OverseerSignal::BlockFinalized(_, _)) => {},
FromOverseer::Communication { msg } => {
Expand All @@ -349,6 +377,100 @@ where
}
}

// Restores the subsystem's state before proceeding with the main event loop. Primarily, this
// repopulates the rolling session window the relevant session information to handle incoming
// import statement requests.
//
// This method also retransmits a `DisputeParticiationMessage::Participate` for any non-concluded
// disputes for which the subsystem doesn't have a local statement, ensuring it eventually makes an
// arbitration on the dispute.
async fn handle_leaf<Context>(
ctx: &mut Context,
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
state: &mut State,
leaves: impl IntoIterator<Item = Hash>,
) -> Result<(), Error>
where
Context: overseer::SubsystemContext<Message = DisputeCoordinatorMessage>,
Context: SubsystemContext<Message = DisputeCoordinatorMessage>,
{
// Process the leaves as we would during normal operation. Any writes are committed
// immediately so that they are made available to the primary event loop.
handle_new_activations(ctx, overlay_db, state, leaves).await?;

let recent_disputes = match overlay_db.load_recent_disputes() {
Ok(Some(disputes)) => disputes,
Ok(None) => return Ok(()),
Err(e) => {
tracing::error!(target: LOG_TARGET, "Failed initial load of recent disputes: {:?}", e);
return Err(e.into());
},
};

// Filter out disputes that have already concluded.
let active_disputes = recent_disputes.into_iter()
.filter(|(_, status)| *status == DisputeStatus::Active)
.collect::<RecentDisputes>();

for ((session, ref candidate_hash), _) in active_disputes.into_iter() {
let votes: CandidateVotes = match overlay_db.load_candidate_votes(session, candidate_hash) {
Ok(Some(votes)) => votes.into(),
Ok(None) => continue,
Err(e) => {
tracing::error!(target: LOG_TARGET, "Failed initial load of candidate votes: {:?}", e);
continue
},
};

let validators = match state.rolling_session_window.session_info(session) {
None => {
tracing::warn!(
target: LOG_TARGET,
session,
"Missing info for session which has an active dispute",
);
continue
}
Some(info) => info.validators.clone(),
};

let n_validators = validators.len();
let voted_indices: HashSet<_> = votes.voted_indices().into_iter().collect();
Lldenaurois marked this conversation as resolved.
Show resolved Hide resolved

// Determine if there are any missing local statements for this dispute. Validators are
// filtered if:
// 1) their statement already exists, or
// 2) the validator key is not in the local keystore (i.e. the validator is remote).
// The remaining set only contains local validators that are also missing statements.
let missing_local_statement = validators.iter()
.enumerate()
.map(|(index, validator)| (ValidatorIndex(index as _), validator))
.filter(|(index, validator)|
voted_indices.contains(index) ||
rphmeier marked this conversation as resolved.
Show resolved Hide resolved
state.keystore.key_pair::<ValidatorPair>(validator).map(|v| v.is_none()).unwrap_or_default())
rphmeier marked this conversation as resolved.
Show resolved Hide resolved
.count() > 0;
rphmeier marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let missing_local_statement = validators.iter()
.enumerate()
.map(|(index, validator)| (ValidatorIndex(index as _), validator))
.filter(|(index, validator)|
voted_indices.contains(index) ||
state.keystore.key_pair::<ValidatorPair>(validator).map(|v| v.is_none()).unwrap_or_default())
.count() > 0;
let missing_local_statement = validators.iter()
.enumerate()
.map(|(index, validator)| (ValidatorIndex(index as _), validator))
.any(|(index, validator)|
!voted_indices.contains(index) &&
state.keystore.key_pair::<ValidatorPair>(validator).ok().flatten().map_or(false, |v| v.is_some())
);

re: https://github.com/paritytech/polkadot/pull/3481/files#r672552299 and https://github.com/paritytech/polkadot/pull/3481/files#r672551425

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how this is correct. The iterator contains an entry for each validator which (has voted | has no key). We really want (has not voted & has a key).

map_or is more expressive than using unwrap_or_default implicitly evaluating to false as a default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will write a test to ensure we can ensure this property.


// Send a `DisputeParticipationMessage` for all non-concluded disputes which do not have a
// recorded local statement.
if missing_local_statement {
let (report_availability, receive_availability) = oneshot::channel();
ctx.send_message(DisputeParticipationMessage::Participate {
Lldenaurois marked this conversation as resolved.
Show resolved Hide resolved
candidate_hash: *candidate_hash,
candidate_receipt: votes.candidate_receipt.clone(),
session,
n_validators: n_validators as u32,
report_availability,
}).await;

if !receive_availability.await? {
tracing::debug!(target: LOG_TARGET, "Participation failed. Candidate not available");
}
}
Lldenaurois marked this conversation as resolved.
Show resolved Hide resolved
}

Ok(())
}

async fn handle_new_activations(
ctx: &mut (impl SubsystemContext<Message = DisputeCoordinatorMessage> + overseer::SubsystemContext<Message = DisputeCoordinatorMessage>),
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
Expand Down Expand Up @@ -470,8 +592,8 @@ async fn handle_incoming(
) => {
issue_local_statement(
ctx,
state,
overlay_db,
state,
candidate_hash,
candidate_receipt,
session,
Expand Down Expand Up @@ -547,6 +669,10 @@ async fn handle_import_statements(
"Missing info for session which has an active dispute",
);

pending_confirmation
.send(ImportStatementsResult::InvalidImport)
.map_err(|_| Error::OneshotSend)?;

return Ok(())
}
Some(info) => info.validators.clone(),
Expand Down Expand Up @@ -671,13 +797,17 @@ async fn handle_import_statements(

overlay_db.write_candidate_votes(session, candidate_hash, votes.into());

pending_confirmation
.send(ImportStatementsResult::ValidImport)
.map_err(|_| Error::OneshotSend)?;

Ok(())
}

async fn issue_local_statement(
ctx: &mut impl SubsystemContext,
state: &mut State,
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
state: &mut State,
candidate_hash: CandidateHash,
candidate_receipt: CandidateReceipt,
session: SessionIndex,
Expand Down
Loading