Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Address Feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Lldenaurois committed Jul 18, 2021
1 parent ee69eaf commit f3c30fa
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 56 deletions.
86 changes: 40 additions & 46 deletions node/core/dispute-coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use polkadot_node_subsystem::{
errors::{ChainApiError, RuntimeApiError},
messages::{
ChainApiMessage, DisputeCoordinatorMessage, DisputeDistributionMessage,
DisputeParticipationMessage, ImportStatementsResult, ChainSelectionMessage,
DisputeParticipationMessage, ImportStatementsResult,
}
};
use polkadot_node_subsystem_util::rolling_session_window::{
Expand Down Expand Up @@ -277,7 +277,7 @@ where
B: Backend,
{
loop {
let res = run_iteration(&mut ctx, &subsystem, &mut backend, &*clock).await;
let res = run_until_error(&mut ctx, &subsystem, &mut backend, &*clock).await;
match res {
Err(e) => {
e.trace();
Expand All @@ -299,7 +299,7 @@ where
//
// A return value of `Ok` indicates that an exit should be made, while non-fatal errors
// lead to another call to this function.
async fn run_iteration<B, Context>(
async fn run_until_error<B, Context>(
ctx: &mut Context,
subsystem: &DisputeCoordinatorSubsystem,
backend: &mut B,
Expand All @@ -316,21 +316,19 @@ where
rolling_session_window: RollingSessionWindow::new(DISPUTE_WINDOW),
};

resume(ctx, &mut state, backend).await?;

loop {
let mut overlay_db = OverlayedBackend::new(backend);
match ctx.recv().await? {
FromOverseer::Signal(OverseerSignal::Conclude) => {
return Ok(())
}
FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => {
handle_new_activations(
handle_leaf(
ctx,
&mut overlay_db,
&mut state,
update.activated.into_iter().map(|a| a.hash),
).await?
).await?;
}
FromOverseer::Signal(OverseerSignal::BlockFinalized(_, _)) => {},
FromOverseer::Communication { msg } => {
Expand All @@ -355,53 +353,44 @@ where
// repopulates the rolling session window the relevant session information to handle incoming
// import statement requests.
//
// This method also retransmits a DisputeParticiationMessage::Participate for any non-concluded
// This method also retransmits a `DisputeParticiationMessage::Participate` for any non-concluded
// disputes for which the subsystem doesn't have a local statement, ensuring it eventually makes an
// arbitration on the disupte.
async fn resume<B, Context>(
// arbitration on the dispute.
async fn handle_leaf<Context>(
ctx: &mut Context,
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
state: &mut State,
backend: &mut B,
leaves: impl IntoIterator<Item = Hash>,
) -> Result<(), Error>
where
Context: overseer::SubsystemContext<Message = DisputeCoordinatorMessage>,
Context: SubsystemContext<Message = DisputeCoordinatorMessage>,
B: Backend,
{

// Query for the set of active leaves to reconstruct our rolling session window.
let (tx, rx) = oneshot::channel();
ctx.send_message(ChainSelectionMessage::Leaves(tx)).await;
let leaves = rx.await?;

// Process the leaves as we would during normal operation. Any writes are committed
// immediately so that they are made available to the primary event loop.
let mut overlay_db = OverlayedBackend::new(backend);
handle_new_activations(ctx, &mut overlay_db, state, leaves).await.unwrap();
let write_ops = overlay_db.into_write_ops();
backend.write(write_ops)?;
handle_new_activations(ctx, overlay_db, state, leaves).await?;

let recent_disputes = match backend.load_recent_disputes() {
let recent_disputes = match overlay_db.load_recent_disputes() {
Ok(Some(disputes)) => disputes,
Ok(None) => return Ok(()),
Err(e) => {
tracing::error!(target: LOG_TARGET, "failed initial load of recent disputes: {:?}", e);
tracing::error!(target: LOG_TARGET, "Failed initial load of recent disputes: {:?}", e);
return Err(e.into());
},
};

// Filter out disputes that have already concluded.
let active_disputes = recent_disputes.into_iter()
.filter(|((_, _), status)| status.concluded_at().is_none())
.filter(|(_, status)| *status == DisputeStatus::Active)
.collect::<RecentDisputes>();

for ((session, ref candidate_hash), _) in active_disputes.into_iter() {
let votes: CandidateVotes = match backend.load_candidate_votes(session, candidate_hash) {
let votes: CandidateVotes = match overlay_db.load_candidate_votes(session, candidate_hash) {
Ok(Some(votes)) => votes.into(),
Ok(None) => continue,
Err(e) => {
tracing::error!(target: LOG_TARGET, "failed initial load of candidate votes: {:?}", e);
continue;
tracing::error!(target: LOG_TARGET, "Failed initial load of candidate votes: {:?}", e);
continue
},
};

Expand All @@ -412,25 +401,30 @@ where
session,
"Missing info for session which has an active dispute",
);

continue;
continue
}
Some(info) => info.validators.clone(),
};

let n_validators = validators.len();
let voted_indices: HashSet<_> = votes.voted_indices().into_iter().collect();
for (index, validator) in validators.iter().enumerate() {
let index = ValidatorIndex(index as _);
if voted_indices.contains(&index) { continue }

// Skip non-local validators.
if state.keystore.key_pair::<ValidatorPair>(validator).ok().flatten().is_none() {
continue
}

// Send a DisputeParticipationMessage for all non-concluded disputes that for which
// we have not recorded a local statement.
// Determine if there are any missing local statements for this dispute. Validators are
// filtered if:
// 1) their statement already exists, or
// 2) the validator key is not in the local keystore (i.e. the validator is remote).
// The remaining set only contains local validators that are also missing statements.
let missing_local_statement = validators.iter()
.enumerate()
.map(|(index, validator)| (ValidatorIndex(index as _), validator))
.filter(|(index, validator)|
voted_indices.contains(index) ||
state.keystore.key_pair::<ValidatorPair>(validator).map(|v| v.is_none()).unwrap_or_default())
.count() > 0;

// Send a `DisputeParticipationMessage` for all non-concluded disputes which do not have a
// recorded local statement.
if missing_local_statement {
let (report_availability, receive_availability) = oneshot::channel();
ctx.send_message(DisputeParticipationMessage::Participate {
candidate_hash: *candidate_hash,
Expand All @@ -440,8 +434,9 @@ where
report_availability,
}).await;

// TODO(ladi): do we care about the response here?
let _ = receive_availability.await;
if !receive_availability.await? {
tracing::debug!(target: LOG_TARGET, "Participation failed. Candidate not available");
}
}
}

Expand Down Expand Up @@ -569,8 +564,8 @@ async fn handle_incoming(
) => {
issue_local_statement(
ctx,
state,
overlay_db,
state,
candidate_hash,
candidate_receipt,
session,
Expand Down Expand Up @@ -770,16 +765,15 @@ async fn handle_import_statements(

overlay_db.write_candidate_votes(session, candidate_hash, votes.into());

// TODO(ladi): should we respond with ImportStatementsResult::ValidImport?
//pending_confirmation.send(ImportStatementsResult::ValidImport) .map_err(|_| Error::OneshotSend)?;
pending_confirmation.send(ImportStatementsResult::ValidImport) .map_err(|_| Error::OneshotSend)?;

Ok(())
}

async fn issue_local_statement(
ctx: &mut impl SubsystemContext,
state: &mut State,
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
state: &mut State,
candidate_hash: CandidateHash,
candidate_receipt: CandidateReceipt,
session: SessionIndex,
Expand Down
29 changes: 19 additions & 10 deletions node/core/dispute-coordinator/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,16 +212,24 @@ impl TestState {

async fn handle_resume_sync(&self, virtual_overseer: &mut VirtualOverseer, session: SessionIndex) {
let leaves: Vec<Hash> = self.headers.keys().cloned().collect();
assert_matches!(
virtual_overseer.recv().await,
AllMessages::ChainSelection(ChainSelectionMessage::Leaves(tx)) => {
tx.send(leaves.clone()).unwrap();
}
);
for leaf in leaves.iter() {
virtual_overseer.send(
FromOverseer::Signal(
OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(
ActivatedLeaf {
hash: *leaf,
number: 1,
span: Arc::new(jaeger::Span::Disabled),
status: LeafStatus::Fresh,
}
)
)
)
).await;

for leaf in leaves {
let header = self.headers.get(&leaf).unwrap().clone();
self.handle_sync_queries(virtual_overseer, leaf, header, session).await;
let header = self.headers.get(leaf).unwrap().clone();
self.handle_sync_queries(virtual_overseer, *leaf, header, session).await;
}
}

Expand Down Expand Up @@ -1153,7 +1161,7 @@ fn resume_dispute_without_local_statement() {
false,
).await;

let (pending_confirmation, _confirmation_rx) = oneshot::channel();
let (pending_confirmation, confirmation_rx) = oneshot::channel();
virtual_overseer.send(FromOverseer::Communication {
msg: DisputeCoordinatorMessage::ImportStatements {
candidate_hash,
Expand All @@ -1166,6 +1174,7 @@ fn resume_dispute_without_local_statement() {
pending_confirmation,
},
}).await;
assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport));

assert_matches!(
virtual_overseer.recv().await,
Expand Down

0 comments on commit f3c30fa

Please sign in to comment.