From a8b3c37924c064ef2959a6cb13a00ba93062a26f Mon Sep 17 00:00:00 2001 From: Lldenaurois Date: Thu, 15 Jul 2021 20:45:36 -0400 Subject: [PATCH 1/8] node/dispute-coordinator: Introduce resume capability This commit introduces a resume capability for the dispute coordinator subsystem. Specifically, this will allow to recover data for disputes for which we have no local statements. --- node/core/dispute-coordinator/src/lib.rs | 33 ++++++++- node/core/dispute-coordinator/src/tests.rs | 83 +++++++++++++++++++--- 2 files changed, 105 insertions(+), 11 deletions(-) diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs index f2b956f3fe6d..2cb88e72a8db 100644 --- a/node/core/dispute-coordinator/src/lib.rs +++ b/node/core/dispute-coordinator/src/lib.rs @@ -35,7 +35,7 @@ use polkadot_node_subsystem::{ errors::{ChainApiError, RuntimeApiError}, messages::{ ChainApiMessage, DisputeCoordinatorMessage, DisputeDistributionMessage, - DisputeParticipationMessage, ImportStatementsResult + DisputeParticipationMessage, ImportStatementsResult, ChainSelectionMessage, } }; use polkadot_node_subsystem_util::rolling_session_window::{ @@ -316,6 +316,8 @@ where rolling_session_window: RollingSessionWindow::new(DISPUTE_WINDOW), }; + resume(ctx, &mut state, backend).await?; + loop { let mut overlay_db = OverlayedBackend::new(backend); match ctx.recv().await? { @@ -349,6 +351,35 @@ where } } +// Restores the subsystem's state before proceeding with the main event loop. Primarily, this +// repopulates the rolling session window the relevant session information to handle incoming +// import statement requests. +async fn resume( + ctx: &mut Context, + state: &mut State, + backend: &mut B, +) -> Result<(), Error> +where + Context: overseer::SubsystemContext, + Context: SubsystemContext, + B: Backend, +{ + + // Query for the set of active leaves to reconstruct our rolling session window. + let (tx, rx) = oneshot::channel(); + ctx.send_message(ChainSelectionMessage::Leaves(tx)).await; + let leaves = rx.await?; + + // Process the leaves as we would during normal operation. Any writes are committed + // immediately so that they are made available to the primary event loop. + let mut overlay_db = OverlayedBackend::new(backend); + handle_new_activations(ctx, &mut overlay_db, state, leaves).await.unwrap(); + let write_ops = overlay_db.into_write_ops(); + backend.write(write_ops)?; + + Ok(()) +} + async fn handle_new_activations( ctx: &mut (impl SubsystemContext + overseer::SubsystemContext), overlay_db: &mut OverlayedBackend<'_, impl Backend>, diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index 92ca2f6ffe3e..582917e3f36a 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -14,6 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +use std::collections::HashMap; use super::*; use polkadot_primitives::v1::{BlakeTwo256, HashT, ValidatorId, Header, SessionInfo}; @@ -86,6 +87,7 @@ struct TestState { db: Arc, config: Config, clock: MockClock, + headers: HashMap, } impl Default for TestState { @@ -126,13 +128,14 @@ impl Default for TestState { db, config, clock: MockClock::default(), + headers: HashMap::new(), } } } impl TestState { async fn activate_leaf_at_session( - &self, + &mut self, virtual_overseer: &mut VirtualOverseer, session: SessionIndex, block_number: BlockNumber, @@ -149,6 +152,8 @@ impl TestState { }; let block_hash = block_header.hash(); + let _ = self.headers.insert(block_hash, block_header.clone()); + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves( ActiveLeavesUpdate::start_work(ActivatedLeaf { hash: block_hash, @@ -158,6 +163,16 @@ impl TestState { }) ))).await; + self.handle_sync_queries(virtual_overseer, block_hash, block_header, session).await; + } + + async fn handle_sync_queries( + &self, + virtual_overseer: &mut VirtualOverseer, + block_hash: Hash, + block_header: Header, + session: SessionIndex, + ) { assert_matches!( virtual_overseer.recv().await, AllMessages::ChainApi(ChainApiMessage::BlockHeader(h, tx)) => { @@ -172,6 +187,7 @@ impl TestState { h, RuntimeApiRequest::SessionIndexForChild(tx), )) => { + let parent_hash = session_to_hash(session, b"parent"); assert_eq!(h, parent_hash); let _ = tx.send(Ok(session)); } @@ -194,6 +210,21 @@ impl TestState { } } + async fn handle_resume_sync(&self, virtual_overseer: &mut VirtualOverseer, session: SessionIndex) { + let leaves: Vec = self.headers.keys().cloned().collect(); + assert_matches!( + virtual_overseer.recv().await, + AllMessages::ChainSelection(ChainSelectionMessage::Leaves(tx)) => { + tx.send(leaves.clone()).unwrap(); + } + ); + + for leaf in leaves { + let header = self.headers.get(&leaf).unwrap().clone(); + self.handle_sync_queries(virtual_overseer, leaf, header, session).await; + } + } + fn session_info(&self) -> SessionInfo { let discovery_keys = self.validators.iter() .map(|k| <_>::from(k.public())) @@ -259,9 +290,11 @@ fn test_harness(test: F) #[test] fn conflicting_votes_lead_to_dispute_participation() { - test_harness(|test_state, mut virtual_overseer| Box::pin(async move { + test_harness(|mut test_state, mut virtual_overseer| Box::pin(async move { let session = 1; + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + let candidate_receipt = CandidateReceipt::default(); let candidate_hash = candidate_receipt.hash(); @@ -379,9 +412,11 @@ fn conflicting_votes_lead_to_dispute_participation() { #[test] fn positive_votes_dont_trigger_participation() { - test_harness(|test_state, mut virtual_overseer| Box::pin(async move { + test_harness(|mut test_state, mut virtual_overseer| Box::pin(async move { let session = 1; + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + let candidate_receipt = CandidateReceipt::default(); let candidate_hash = candidate_receipt.hash(); @@ -482,9 +517,11 @@ fn positive_votes_dont_trigger_participation() { #[test] fn wrong_validator_index_is_ignored() { - test_harness(|test_state, mut virtual_overseer| Box::pin(async move { + test_harness(|mut test_state, mut virtual_overseer| Box::pin(async move { let session = 1; + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + let candidate_receipt = CandidateReceipt::default(); let candidate_hash = candidate_receipt.hash(); @@ -552,9 +589,11 @@ fn wrong_validator_index_is_ignored() { #[test] fn finality_votes_ignore_disputed_candidates() { - test_harness(|test_state, mut virtual_overseer| Box::pin(async move { + test_harness(|mut test_state, mut virtual_overseer| Box::pin(async move { let session = 1; + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + let candidate_receipt = CandidateReceipt::default(); let candidate_hash = candidate_receipt.hash(); @@ -644,9 +683,11 @@ fn finality_votes_ignore_disputed_candidates() { #[test] fn supermajority_valid_dispute_may_be_finalized() { - test_harness(|test_state, mut virtual_overseer| Box::pin(async move { + test_harness(|mut test_state, mut virtual_overseer| Box::pin(async move { let session = 1; + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + let candidate_receipt = CandidateReceipt::default(); let candidate_hash = candidate_receipt.hash(); @@ -688,7 +729,23 @@ fn supermajority_valid_dispute_may_be_finalized() { }, }).await; - let _ = virtual_overseer.recv().await; + assert_matches!( + virtual_overseer.recv().await, + AllMessages::DisputeParticipation( + DisputeParticipationMessage::Participate { + candidate_hash: c_hash, + candidate_receipt: c_receipt, + session: s, + report_availability, + .. + } + ) => { + assert_eq!(candidate_hash, c_hash); + assert_eq!(candidate_receipt, c_receipt); + assert_eq!(session, s); + report_availability.send(true).unwrap(); + } + ); let mut statements = Vec::new(); for i in (0..supermajority_threshold - 1).map(|i| i + 2) { @@ -753,9 +810,11 @@ fn supermajority_valid_dispute_may_be_finalized() { #[test] fn concluded_supermajority_for_non_active_after_time() { - test_harness(|test_state, mut virtual_overseer| Box::pin(async move { + test_harness(|mut test_state, mut virtual_overseer| Box::pin(async move { let session = 1; + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + let candidate_receipt = CandidateReceipt::default(); let candidate_hash = candidate_receipt.hash(); @@ -859,9 +918,11 @@ fn concluded_supermajority_for_non_active_after_time() { #[test] fn concluded_supermajority_against_non_active_after_time() { - test_harness(|test_state, mut virtual_overseer| Box::pin(async move { + test_harness(|mut test_state, mut virtual_overseer| Box::pin(async move { let session = 1; + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + let candidate_receipt = CandidateReceipt::default(); let candidate_hash = candidate_receipt.hash(); @@ -965,9 +1026,11 @@ fn concluded_supermajority_against_non_active_after_time() { #[test] fn fresh_dispute_ignored_if_unavailable() { - test_harness(|test_state, mut virtual_overseer| Box::pin(async move { + test_harness(|mut test_state, mut virtual_overseer| Box::pin(async move { let session = 1; + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + let candidate_receipt = CandidateReceipt::default(); let candidate_hash = candidate_receipt.hash(); From e285a22a024ac507ee8eaa1f819f3001d24d21b7 Mon Sep 17 00:00:00 2001 From: Lldenaurois Date: Thu, 15 Jul 2021 20:47:37 -0400 Subject: [PATCH 2/8] node/dispute-coordinator: Add resume function to TestState and modify Harness This commit modifies the TestHarness to return a TestState. We subsequently define a resume function on TestState that allows to interrupt the test and test specifically for behavior on startup of the subsystem. --- node/core/dispute-coordinator/src/tests.rs | 52 +++++++++++++++------- 1 file changed, 36 insertions(+), 16 deletions(-) diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index 582917e3f36a..25ab1ee42b4c 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -267,25 +267,29 @@ impl TestState { public, ).await.unwrap().unwrap() } -} - -fn test_harness(test: F) - where F: FnOnce(TestState, VirtualOverseer) -> BoxFuture<'static, ()> -{ - let (ctx, ctx_handle) = make_subsystem_context(TaskExecutor::new()); - let state = TestState::default(); - let subsystem = DisputeCoordinatorSubsystem::new( - state.db.clone(), - state.config.clone(), - state.subsystem_keystore.clone(), - ); + fn resume(self, test: F) -> Self + where F: FnOnce(TestState, VirtualOverseer) -> BoxFuture<'static, TestState> + { + let (ctx, ctx_handle) = make_subsystem_context(TaskExecutor::new()); + let subsystem = DisputeCoordinatorSubsystem::new( + self.db.clone(), + self.config.clone(), + self.subsystem_keystore.clone(), + ); + let backend = DbBackend::new(self.db.clone(), self.config.column_config()); + let subsystem_task = run(subsystem, ctx, backend, Box::new(self.clock.clone())); + let test_task = test(self, ctx_handle); - let backend = DbBackend::new(state.db.clone(), state.config.column_config()); - let subsystem_task = run(subsystem, ctx, backend, Box::new(state.clock.clone())); - let test_task = test(state, ctx_handle); + let (_, state) = futures::executor::block_on(future::join(subsystem_task, test_task)); + state + } +} - futures::executor::block_on(future::join(subsystem_task, test_task)); +fn test_harness(test: F) -> TestState + where F: FnOnce(TestState, VirtualOverseer) -> BoxFuture<'static, TestState> +{ + TestState::default().resume(test) } #[test] @@ -407,6 +411,8 @@ fn conflicting_votes_lead_to_dispute_participation() { // This confirms that the second vote doesn't lead to participation again. assert!(virtual_overseer.try_recv().await.is_none()); + + test_state })); } @@ -512,6 +518,8 @@ fn positive_votes_dont_trigger_participation() { // This confirms that no participation request is made. assert!(virtual_overseer.try_recv().await.is_none()); + + test_state })); } @@ -584,6 +592,8 @@ fn wrong_validator_index_is_ignored() { // This confirms that no participation request is made. assert!(virtual_overseer.try_recv().await.is_none()); + + test_state })); } @@ -678,6 +688,8 @@ fn finality_votes_ignore_disputed_candidates() { virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; assert!(virtual_overseer.try_recv().await.is_none()); + + test_state })); } @@ -805,6 +817,8 @@ fn supermajority_valid_dispute_may_be_finalized() { virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; assert!(virtual_overseer.try_recv().await.is_none()); + + test_state })); } @@ -913,6 +927,8 @@ fn concluded_supermajority_for_non_active_after_time() { virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; assert!(virtual_overseer.try_recv().await.is_none()); + + test_state })); } @@ -1021,6 +1037,8 @@ fn concluded_supermajority_against_non_active_after_time() { virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; assert!(virtual_overseer.try_recv().await.is_none()); + + test_state })); } @@ -1100,5 +1118,7 @@ fn fresh_dispute_ignored_if_unavailable() { virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; assert!(virtual_overseer.try_recv().await.is_none()); + + test_state })); } From ee69eafdc40242ee2a60515032eefdf9a6c03347 Mon Sep 17 00:00:00 2001 From: Lldenaurois Date: Thu, 15 Jul 2021 20:49:31 -0400 Subject: [PATCH 3/8] node/dispute-coordinator: Implement resume functionality This commit implements the resume functionality for the subsystem. In addition, we will forward any DisputeParticipation::Participate message in order to ensure that disputes for which we do not have local statements may be recovered in due time. --- node/core/dispute-coordinator/src/lib.rs | 71 ++++++++++++++ node/core/dispute-coordinator/src/tests.rs | 104 +++++++++++++++++++++ 2 files changed, 175 insertions(+) diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs index 2cb88e72a8db..0b6b69c3a3c6 100644 --- a/node/core/dispute-coordinator/src/lib.rs +++ b/node/core/dispute-coordinator/src/lib.rs @@ -354,6 +354,10 @@ where // Restores the subsystem's state before proceeding with the main event loop. Primarily, this // repopulates the rolling session window the relevant session information to handle incoming // import statement requests. +// +// This method also retransmits a DisputeParticiationMessage::Participate for any non-concluded +// disputes for which the subsystem doesn't have a local statement, ensuring it eventually makes an +// arbitration on the disupte. async fn resume( ctx: &mut Context, state: &mut State, @@ -377,6 +381,70 @@ where let write_ops = overlay_db.into_write_ops(); backend.write(write_ops)?; + let recent_disputes = match backend.load_recent_disputes() { + Ok(Some(disputes)) => disputes, + Ok(None) => return Ok(()), + Err(e) => { + tracing::error!(target: LOG_TARGET, "failed initial load of recent disputes: {:?}", e); + return Err(e.into()); + }, + }; + + // Filter out disputes that have already concluded. + let active_disputes = recent_disputes.into_iter() + .filter(|((_, _), status)| status.concluded_at().is_none()) + .collect::(); + + for ((session, ref candidate_hash), _) in active_disputes.into_iter() { + let votes: CandidateVotes = match backend.load_candidate_votes(session, candidate_hash) { + Ok(Some(votes)) => votes.into(), + Ok(None) => continue, + Err(e) => { + tracing::error!(target: LOG_TARGET, "failed initial load of candidate votes: {:?}", e); + continue; + }, + }; + + let validators = match state.rolling_session_window.session_info(session) { + None => { + tracing::warn!( + target: LOG_TARGET, + session, + "Missing info for session which has an active dispute", + ); + + continue; + } + Some(info) => info.validators.clone(), + }; + + let n_validators = validators.len(); + let voted_indices: HashSet<_> = votes.voted_indices().into_iter().collect(); + for (index, validator) in validators.iter().enumerate() { + let index = ValidatorIndex(index as _); + if voted_indices.contains(&index) { continue } + + // Skip non-local validators. + if state.keystore.key_pair::(validator).ok().flatten().is_none() { + continue + } + + // Send a DisputeParticipationMessage for all non-concluded disputes that for which + // we have not recorded a local statement. + let (report_availability, receive_availability) = oneshot::channel(); + ctx.send_message(DisputeParticipationMessage::Participate { + candidate_hash: *candidate_hash, + candidate_receipt: votes.candidate_receipt.clone(), + session, + n_validators: n_validators as u32, + report_availability, + }).await; + + // TODO(ladi): do we care about the response here? + let _ = receive_availability.await; + } + } + Ok(()) } @@ -702,6 +770,9 @@ async fn handle_import_statements( overlay_db.write_candidate_votes(session, candidate_hash, votes.into()); + // TODO(ladi): should we respond with ImportStatementsResult::ValidImport? + //pending_confirmation.send(ImportStatementsResult::ValidImport) .map_err(|_| Error::OneshotSend)?; + Ok(()) } diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index 25ab1ee42b4c..1b069c9ee942 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -1122,3 +1122,107 @@ fn fresh_dispute_ignored_if_unavailable() { test_state })); } + +#[test] +fn resume_dispute_without_local_statement() { + let session = 1; + + test_harness(|mut test_state, mut virtual_overseer| Box::pin(async move { + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + + let candidate_receipt = CandidateReceipt::default(); + let candidate_hash = candidate_receipt.hash(); + + test_state.activate_leaf_at_session( + &mut virtual_overseer, + session, + 1, + ).await; + + let valid_vote = test_state.issue_statement_with_index( + 1, + candidate_hash, + session, + true, + ).await; + + let invalid_vote = test_state.issue_statement_with_index( + 2, + candidate_hash, + session, + false, + ).await; + + let (pending_confirmation, _confirmation_rx) = oneshot::channel(); + virtual_overseer.send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::ImportStatements { + candidate_hash, + candidate_receipt: candidate_receipt.clone(), + session, + statements: vec![ + (valid_vote, ValidatorIndex(1)), + (invalid_vote, ValidatorIndex(2)), + ], + pending_confirmation, + }, + }).await; + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::DisputeParticipation( + DisputeParticipationMessage::Participate { + report_availability, + .. + } + ) => { + report_availability.send(true).unwrap(); + } + ); + + { + let (tx, rx) = oneshot::channel(); + + virtual_overseer.send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::ActiveDisputes(tx), + }).await; + + assert_eq!(rx.await.unwrap().len(), 1); + } + + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + assert!(virtual_overseer.try_recv().await.is_none()); + + test_state + })) + // Alice should send a DisputeParticiationMessage::Participate on restart since she has no + // local statement for the active dispute. + .resume(|test_state, mut virtual_overseer| Box::pin(async move { + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + + let candidate_receipt = CandidateReceipt::default(); + let candidate_hash = candidate_receipt.hash(); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::DisputeParticipation( + DisputeParticipationMessage::Participate { + candidate_hash: c_hash, + candidate_receipt: c_receipt, + session: s, + report_availability, + .. + } + ) => { + assert_eq!(candidate_hash, c_hash); + assert_eq!(candidate_receipt, c_receipt); + assert_eq!(session, s); + report_availability.send(true).unwrap(); + } + ); + + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + assert!(virtual_overseer.try_recv().await.is_none()); + + test_state + })); +} From e37d6d717f2149480cc299a3c2bb7b41499b59b7 Mon Sep 17 00:00:00 2001 From: Lldenaurois Date: Sun, 18 Jul 2021 16:33:22 -0400 Subject: [PATCH 4/8] Address Feedback --- node/core/dispute-coordinator/src/lib.rs | 92 +++++++++++----------- node/core/dispute-coordinator/src/tests.rs | 30 ++++--- 2 files changed, 66 insertions(+), 56 deletions(-) diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs index 0b6b69c3a3c6..e3f464366d61 100644 --- a/node/core/dispute-coordinator/src/lib.rs +++ b/node/core/dispute-coordinator/src/lib.rs @@ -35,7 +35,7 @@ use polkadot_node_subsystem::{ errors::{ChainApiError, RuntimeApiError}, messages::{ ChainApiMessage, DisputeCoordinatorMessage, DisputeDistributionMessage, - DisputeParticipationMessage, ImportStatementsResult, ChainSelectionMessage, + DisputeParticipationMessage, ImportStatementsResult, } }; use polkadot_node_subsystem_util::rolling_session_window::{ @@ -277,7 +277,7 @@ where B: Backend, { loop { - let res = run_iteration(&mut ctx, &subsystem, &mut backend, &*clock).await; + let res = run_until_error(&mut ctx, &subsystem, &mut backend, &*clock).await; match res { Err(e) => { e.trace(); @@ -299,7 +299,7 @@ where // // A return value of `Ok` indicates that an exit should be made, while non-fatal errors // lead to another call to this function. -async fn run_iteration( +async fn run_until_error( ctx: &mut Context, subsystem: &DisputeCoordinatorSubsystem, backend: &mut B, @@ -316,8 +316,6 @@ where rolling_session_window: RollingSessionWindow::new(DISPUTE_WINDOW), }; - resume(ctx, &mut state, backend).await?; - loop { let mut overlay_db = OverlayedBackend::new(backend); match ctx.recv().await? { @@ -325,12 +323,12 @@ where return Ok(()) } FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => { - handle_new_activations( + handle_leaf( ctx, &mut overlay_db, &mut state, update.activated.into_iter().map(|a| a.hash), - ).await? + ).await?; } FromOverseer::Signal(OverseerSignal::BlockFinalized(_, _)) => {}, FromOverseer::Communication { msg } => { @@ -355,53 +353,44 @@ where // repopulates the rolling session window the relevant session information to handle incoming // import statement requests. // -// This method also retransmits a DisputeParticiationMessage::Participate for any non-concluded +// This method also retransmits a `DisputeParticiationMessage::Participate` for any non-concluded // disputes for which the subsystem doesn't have a local statement, ensuring it eventually makes an -// arbitration on the disupte. -async fn resume( +// arbitration on the dispute. +async fn handle_leaf( ctx: &mut Context, + overlay_db: &mut OverlayedBackend<'_, impl Backend>, state: &mut State, - backend: &mut B, + leaves: impl IntoIterator, ) -> Result<(), Error> where Context: overseer::SubsystemContext, Context: SubsystemContext, - B: Backend, { - - // Query for the set of active leaves to reconstruct our rolling session window. - let (tx, rx) = oneshot::channel(); - ctx.send_message(ChainSelectionMessage::Leaves(tx)).await; - let leaves = rx.await?; - // Process the leaves as we would during normal operation. Any writes are committed // immediately so that they are made available to the primary event loop. - let mut overlay_db = OverlayedBackend::new(backend); - handle_new_activations(ctx, &mut overlay_db, state, leaves).await.unwrap(); - let write_ops = overlay_db.into_write_ops(); - backend.write(write_ops)?; + handle_new_activations(ctx, overlay_db, state, leaves).await?; - let recent_disputes = match backend.load_recent_disputes() { + let recent_disputes = match overlay_db.load_recent_disputes() { Ok(Some(disputes)) => disputes, Ok(None) => return Ok(()), Err(e) => { - tracing::error!(target: LOG_TARGET, "failed initial load of recent disputes: {:?}", e); + tracing::error!(target: LOG_TARGET, "Failed initial load of recent disputes: {:?}", e); return Err(e.into()); }, }; // Filter out disputes that have already concluded. let active_disputes = recent_disputes.into_iter() - .filter(|((_, _), status)| status.concluded_at().is_none()) + .filter(|(_, status)| *status == DisputeStatus::Active) .collect::(); for ((session, ref candidate_hash), _) in active_disputes.into_iter() { - let votes: CandidateVotes = match backend.load_candidate_votes(session, candidate_hash) { + let votes: CandidateVotes = match overlay_db.load_candidate_votes(session, candidate_hash) { Ok(Some(votes)) => votes.into(), Ok(None) => continue, Err(e) => { - tracing::error!(target: LOG_TARGET, "failed initial load of candidate votes: {:?}", e); - continue; + tracing::error!(target: LOG_TARGET, "Failed initial load of candidate votes: {:?}", e); + continue }, }; @@ -412,25 +401,30 @@ where session, "Missing info for session which has an active dispute", ); - - continue; + continue } Some(info) => info.validators.clone(), }; let n_validators = validators.len(); let voted_indices: HashSet<_> = votes.voted_indices().into_iter().collect(); - for (index, validator) in validators.iter().enumerate() { - let index = ValidatorIndex(index as _); - if voted_indices.contains(&index) { continue } - // Skip non-local validators. - if state.keystore.key_pair::(validator).ok().flatten().is_none() { - continue - } - - // Send a DisputeParticipationMessage for all non-concluded disputes that for which - // we have not recorded a local statement. + // Determine if there are any missing local statements for this dispute. Validators are + // filtered if: + // 1) their statement already exists, or + // 2) the validator key is not in the local keystore (i.e. the validator is remote). + // The remaining set only contains local validators that are also missing statements. + let missing_local_statement = validators.iter() + .enumerate() + .map(|(index, validator)| (ValidatorIndex(index as _), validator)) + .filter(|(index, validator)| + voted_indices.contains(index) || + state.keystore.key_pair::(validator).map(|v| v.is_none()).unwrap_or_default()) + .count() > 0; + + // Send a `DisputeParticipationMessage` for all non-concluded disputes which do not have a + // recorded local statement. + if missing_local_statement { let (report_availability, receive_availability) = oneshot::channel(); ctx.send_message(DisputeParticipationMessage::Participate { candidate_hash: *candidate_hash, @@ -440,8 +434,9 @@ where report_availability, }).await; - // TODO(ladi): do we care about the response here? - let _ = receive_availability.await; + if !receive_availability.await? { + tracing::debug!(target: LOG_TARGET, "Participation failed. Candidate not available"); + } } } @@ -569,8 +564,8 @@ async fn handle_incoming( ) => { issue_local_statement( ctx, - state, overlay_db, + state, candidate_hash, candidate_receipt, session, @@ -646,6 +641,10 @@ async fn handle_import_statements( "Missing info for session which has an active dispute", ); + pending_confirmation + .send(ImportStatementsResult::InvalidImport) + .map_err(|_| Error::OneshotSend)?; + return Ok(()) } Some(info) => info.validators.clone(), @@ -770,16 +769,17 @@ async fn handle_import_statements( overlay_db.write_candidate_votes(session, candidate_hash, votes.into()); - // TODO(ladi): should we respond with ImportStatementsResult::ValidImport? - //pending_confirmation.send(ImportStatementsResult::ValidImport) .map_err(|_| Error::OneshotSend)?; + pending_confirmation + .send(ImportStatementsResult::ValidImport) + .map_err(|_| Error::OneshotSend)?; Ok(()) } async fn issue_local_statement( ctx: &mut impl SubsystemContext, - state: &mut State, overlay_db: &mut OverlayedBackend<'_, impl Backend>, + state: &mut State, candidate_hash: CandidateHash, candidate_receipt: CandidateReceipt, session: SessionIndex, diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index 1b069c9ee942..30b8e5d31ca1 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -212,16 +212,24 @@ impl TestState { async fn handle_resume_sync(&self, virtual_overseer: &mut VirtualOverseer, session: SessionIndex) { let leaves: Vec = self.headers.keys().cloned().collect(); - assert_matches!( - virtual_overseer.recv().await, - AllMessages::ChainSelection(ChainSelectionMessage::Leaves(tx)) => { - tx.send(leaves.clone()).unwrap(); - } - ); + for leaf in leaves.iter() { + virtual_overseer.send( + FromOverseer::Signal( + OverseerSignal::ActiveLeaves( + ActiveLeavesUpdate::start_work( + ActivatedLeaf { + hash: *leaf, + number: 1, + span: Arc::new(jaeger::Span::Disabled), + status: LeafStatus::Fresh, + } + ) + ) + ) + ).await; - for leaf in leaves { - let header = self.headers.get(&leaf).unwrap().clone(); - self.handle_sync_queries(virtual_overseer, leaf, header, session).await; + let header = self.headers.get(leaf).unwrap().clone(); + self.handle_sync_queries(virtual_overseer, *leaf, header, session).await; } } @@ -1153,7 +1161,7 @@ fn resume_dispute_without_local_statement() { false, ).await; - let (pending_confirmation, _confirmation_rx) = oneshot::channel(); + let (pending_confirmation, confirmation_rx) = oneshot::channel(); virtual_overseer.send(FromOverseer::Communication { msg: DisputeCoordinatorMessage::ImportStatements { candidate_hash, @@ -1179,6 +1187,8 @@ fn resume_dispute_without_local_statement() { } ); + assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport)); + { let (tx, rx) = oneshot::channel(); From 93b0e5aa46c10e2d1c3417ddc7d26c4d3b36e37d Mon Sep 17 00:00:00 2001 From: Lldenaurois Date: Mon, 19 Jul 2021 14:03:45 -0400 Subject: [PATCH 5/8] Modify to run handle_leaf on first import --- node/core/dispute-coordinator/src/lib.rs | 40 ++++++++++++++++++++---- 1 file changed, 34 insertions(+), 6 deletions(-) diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs index e3f464366d61..81cf2080b1ae 100644 --- a/node/core/dispute-coordinator/src/lib.rs +++ b/node/core/dispute-coordinator/src/lib.rs @@ -71,10 +71,27 @@ const ACTIVE_DURATION_SECS: Timestamp = 180; /// Timestamp based on the 1 Jan 1970 UNIX base, which is persistent across node restarts and OS reboots. type Timestamp = u64; +#[derive(Eq, PartialEq)] +enum Recovery { + Pending, + Complete, +} + +impl Recovery { + fn complete(&mut self) -> bool { + let complete = *self == Recovery::Complete; + if !complete { + *self = Recovery::Complete + } + complete + } +} + struct State { keystore: Arc, highest_session: Option, rolling_session_window: RollingSessionWindow, + recovery_state: Recovery, } /// Configuration for the dispute coordinator subsystem. @@ -314,6 +331,7 @@ where keystore: subsystem.keystore.clone(), highest_session: None, rolling_session_window: RollingSessionWindow::new(DISPUTE_WINDOW), + recovery_state: Recovery::Pending, }; loop { @@ -323,12 +341,22 @@ where return Ok(()) } FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => { - handle_leaf( - ctx, - &mut overlay_db, - &mut state, - update.activated.into_iter().map(|a| a.hash), - ).await?; + let leaves = update.activated.into_iter().map(|a| a.hash); + if !state.recovery_state.complete() { + handle_leaf( + ctx, + &mut overlay_db, + &mut state, + leaves, + ).await?; + } else { + handle_new_activations( + ctx, + &mut overlay_db, + &mut state, + leaves, + ).await?; + } } FromOverseer::Signal(OverseerSignal::BlockFinalized(_, _)) => {}, FromOverseer::Communication { msg } => { From d09e2fecb5bb4c20f08155245cb4c189f8180189 Mon Sep 17 00:00:00 2001 From: Lldenaurois Date: Mon, 19 Jul 2021 15:55:36 -0400 Subject: [PATCH 6/8] Modify missing_local_statement logic --- node/core/dispute-coordinator/src/lib.rs | 47 +++++++++++------------- 1 file changed, 21 insertions(+), 26 deletions(-) diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs index 81cf2080b1ae..50d2d7f78253 100644 --- a/node/core/dispute-coordinator/src/lib.rs +++ b/node/core/dispute-coordinator/src/lib.rs @@ -72,16 +72,16 @@ const ACTIVE_DURATION_SECS: Timestamp = 180; type Timestamp = u64; #[derive(Eq, PartialEq)] -enum Recovery { +enum Participation { Pending, Complete, } -impl Recovery { +impl Participation { fn complete(&mut self) -> bool { - let complete = *self == Recovery::Complete; + let complete = *self == Participation::Complete; if !complete { - *self = Recovery::Complete + *self = Participation::Complete } complete } @@ -91,7 +91,7 @@ struct State { keystore: Arc, highest_session: Option, rolling_session_window: RollingSessionWindow, - recovery_state: Recovery, + recovery_state: Participation, } /// Configuration for the dispute coordinator subsystem. @@ -331,7 +331,7 @@ where keystore: subsystem.keystore.clone(), highest_session: None, rolling_session_window: RollingSessionWindow::new(DISPUTE_WINDOW), - recovery_state: Recovery::Pending, + recovery_state: Participation::Pending, }; loop { @@ -341,20 +341,17 @@ where return Ok(()) } FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => { - let leaves = update.activated.into_iter().map(|a| a.hash); + handle_new_activations( + ctx, + &mut overlay_db, + &mut state, + update.activated.into_iter().map(|a| a.hash), + ).await?; if !state.recovery_state.complete() { - handle_leaf( - ctx, - &mut overlay_db, - &mut state, - leaves, - ).await?; - } else { - handle_new_activations( + handle_startup( ctx, &mut overlay_db, &mut state, - leaves, ).await?; } } @@ -384,20 +381,15 @@ where // This method also retransmits a `DisputeParticiationMessage::Participate` for any non-concluded // disputes for which the subsystem doesn't have a local statement, ensuring it eventually makes an // arbitration on the dispute. -async fn handle_leaf( +async fn handle_startup( ctx: &mut Context, overlay_db: &mut OverlayedBackend<'_, impl Backend>, state: &mut State, - leaves: impl IntoIterator, ) -> Result<(), Error> where Context: overseer::SubsystemContext, Context: SubsystemContext, { - // Process the leaves as we would during normal operation. Any writes are committed - // immediately so that they are made available to the primary event loop. - handle_new_activations(ctx, overlay_db, state, leaves).await?; - let recent_disputes = match overlay_db.load_recent_disputes() { Ok(Some(disputes)) => disputes, Ok(None) => return Ok(()), @@ -445,10 +437,13 @@ where let missing_local_statement = validators.iter() .enumerate() .map(|(index, validator)| (ValidatorIndex(index as _), validator)) - .filter(|(index, validator)| - voted_indices.contains(index) || - state.keystore.key_pair::(validator).map(|v| v.is_none()).unwrap_or_default()) - .count() > 0; + .any(|(index, validator)| + !voted_indices.contains(&index) && + state.keystore + .key_pair::(validator) + .ok() + .map_or(false, |v| v.is_some()) + ); // Send a `DisputeParticipationMessage` for all non-concluded disputes which do not have a // recorded local statement. From 2dbd4162d6251ff9cc28fb984a05b7c4bc18ef58 Mon Sep 17 00:00:00 2001 From: Lldenaurois Date: Tue, 20 Jul 2021 15:17:41 -0400 Subject: [PATCH 7/8] node/dispute-coordinator: Add simple test to ensure we adequately handle local_statements that are not missing. --- node/core/dispute-coordinator/src/tests.rs | 97 ++++++++++++++++++++++ 1 file changed, 97 insertions(+) diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index 30b8e5d31ca1..e2a9617c35e8 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; use super::*; +use overseer::TimeoutExt; use polkadot_primitives::v1::{BlakeTwo256, HashT, ValidatorId, Header, SessionInfo}; use polkadot_node_subsystem::{jaeger, ActiveLeavesUpdate, ActivatedLeaf, LeafStatus}; use polkadot_node_subsystem::messages::{ @@ -1236,3 +1237,99 @@ fn resume_dispute_without_local_statement() { test_state })); } + +#[test] +fn resume_dispute_with_local_statement() { + let session = 1; + + test_harness(|mut test_state, mut virtual_overseer| Box::pin(async move { + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + + let candidate_receipt = CandidateReceipt::default(); + let candidate_hash = candidate_receipt.hash(); + + test_state.activate_leaf_at_session( + &mut virtual_overseer, + session, + 1, + ).await; + + let local_valid_vote = test_state.issue_statement_with_index( + 0, + candidate_hash, + session, + true, + ).await; + + let valid_vote = test_state.issue_statement_with_index( + 1, + candidate_hash, + session, + true, + ).await; + + let invalid_vote = test_state.issue_statement_with_index( + 2, + candidate_hash, + session, + false, + ).await; + + let (pending_confirmation, confirmation_rx) = oneshot::channel(); + virtual_overseer.send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::ImportStatements { + candidate_hash, + candidate_receipt: candidate_receipt.clone(), + session, + statements: vec![ + (local_valid_vote, ValidatorIndex(0)), + (valid_vote, ValidatorIndex(1)), + (invalid_vote, ValidatorIndex(2)), + ], + pending_confirmation, + }, + }).await; + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::DisputeParticipation( + DisputeParticipationMessage::Participate { + report_availability, + .. + } + ) => { + report_availability.send(true).unwrap(); + } + ); + + assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport)); + + { + let (tx, rx) = oneshot::channel(); + + virtual_overseer.send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::ActiveDisputes(tx), + }).await; + + assert_eq!(rx.await.unwrap().len(), 1); + } + + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + assert!(virtual_overseer.try_recv().await.is_none()); + + test_state + })) + // Alice should send a DisputeParticiationMessage::Participate on restart since she has no + // local statement for the active dispute. + .resume(|test_state, mut virtual_overseer| Box::pin(async move { + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + + // Assert that subsystem is not sending Participation messages because we issued a local statement + assert!(virtual_overseer.recv().timeout(std::time::Duration::from_secs(2)).await.is_none()); + + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + assert!(virtual_overseer.try_recv().await.is_none()); + + test_state + })); +} From d60d34c111c708a7528504a0dd34cd97c865df73 Mon Sep 17 00:00:00 2001 From: Lldenaurois Date: Tue, 20 Jul 2021 15:42:32 -0400 Subject: [PATCH 8/8] Add missing keystore tests --- Cargo.lock | 1 + node/core/dispute-coordinator/Cargo.toml | 1 + node/core/dispute-coordinator/src/tests.rs | 251 ++++++++++++++++++++- 3 files changed, 250 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 57f71119322e..6cd78e548f73 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6271,6 +6271,7 @@ dependencies = [ "polkadot-node-subsystem", "polkadot-node-subsystem-test-helpers", "polkadot-node-subsystem-util", + "polkadot-overseer", "polkadot-primitives", "sc-keystore", "sp-core", diff --git a/node/core/dispute-coordinator/Cargo.toml b/node/core/dispute-coordinator/Cargo.toml index 72af0db6ed62..71024dd20168 100644 --- a/node/core/dispute-coordinator/Cargo.toml +++ b/node/core/dispute-coordinator/Cargo.toml @@ -27,3 +27,4 @@ sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" } assert_matches = "1.4.0" +polkadot-overseer = { path = "../../overseer" } diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index e2a9617c35e8..b3610df2f043 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -35,6 +35,9 @@ use parity_scale_codec::Encode; use assert_matches::assert_matches; use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering}; +use std::time::Duration; + +const TEST_TIMEOUT: Duration = Duration::from_secs(2); // sets up a keystore with the given keyring accounts. fn make_keystore(accounts: &[Sr25519Keyring]) -> LocalKeystore { @@ -1231,8 +1234,63 @@ fn resume_dispute_without_local_statement() { } ); - virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; - assert!(virtual_overseer.try_recv().await.is_none()); + let valid_vote0 = test_state.issue_statement_with_index( + 0, + candidate_hash, + session, + true, + ).await; + let valid_vote3 = test_state.issue_statement_with_index( + 3, + candidate_hash, + session, + true, + ).await; + let valid_vote4 = test_state.issue_statement_with_index( + 4, + candidate_hash, + session, + true, + ).await; + let valid_vote5 = test_state.issue_statement_with_index( + 5, + candidate_hash, + session, + true, + ).await; + + let (pending_confirmation, _confirmation_rx) = oneshot::channel(); + virtual_overseer.send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::ImportStatements { + candidate_hash, + candidate_receipt: candidate_receipt.clone(), + session, + statements: vec![ + (valid_vote0, ValidatorIndex(0)), + (valid_vote3, ValidatorIndex(3)), + (valid_vote4, ValidatorIndex(4)), + (valid_vote5, ValidatorIndex(5)), + ], + pending_confirmation, + }, + }).await; + + // Advance the clock far enough so that the concluded dispute will be omitted from an + // ActiveDisputes query. + test_state.clock.set(test_state.clock.now() + ACTIVE_DURATION_SECS + 1 ); + + { + let (tx, rx) = oneshot::channel(); + + virtual_overseer.send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::ActiveDisputes(tx), + }).await; + + assert!(rx.await.unwrap().is_empty()); + } + + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + assert!(virtual_overseer.try_recv().await.is_none()); test_state })); @@ -1325,7 +1383,194 @@ fn resume_dispute_with_local_statement() { test_state.handle_resume_sync(&mut virtual_overseer, session).await; // Assert that subsystem is not sending Participation messages because we issued a local statement - assert!(virtual_overseer.recv().timeout(std::time::Duration::from_secs(2)).await.is_none()); + assert!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await.is_none()); + + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + assert!(virtual_overseer.try_recv().await.is_none()); + + test_state + })); +} + +#[test] +fn resume_dispute_without_local_statement_or_local_key() { + let session = 1; + let mut test_state = TestState::default(); + test_state.subsystem_keystore = make_keystore(&[Sr25519Keyring::Two]).into(); + test_state.resume(|mut test_state, mut virtual_overseer| Box::pin(async move { + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + + let candidate_receipt = CandidateReceipt::default(); + let candidate_hash = candidate_receipt.hash(); + + test_state.activate_leaf_at_session( + &mut virtual_overseer, + session, + 1, + ).await; + + let valid_vote = test_state.issue_statement_with_index( + 1, + candidate_hash, + session, + true, + ).await; + + let invalid_vote = test_state.issue_statement_with_index( + 2, + candidate_hash, + session, + false, + ).await; + + let (pending_confirmation, confirmation_rx) = oneshot::channel(); + virtual_overseer.send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::ImportStatements { + candidate_hash, + candidate_receipt: candidate_receipt.clone(), + session, + statements: vec![ + (valid_vote, ValidatorIndex(1)), + (invalid_vote, ValidatorIndex(2)), + ], + pending_confirmation, + }, + }).await; + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::DisputeParticipation( + DisputeParticipationMessage::Participate { + report_availability, + .. + } + ) => { + report_availability.send(true).unwrap(); + } + ); + + assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport)); + + { + let (tx, rx) = oneshot::channel(); + + virtual_overseer.send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::ActiveDisputes(tx), + }).await; + + assert_eq!(rx.await.unwrap().len(), 1); + } + + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + assert!(virtual_overseer.try_recv().await.is_none()); + + test_state + })) + // Alice should send a DisputeParticiationMessage::Participate on restart since she has no + // local statement for the active dispute. + .resume(|test_state, mut virtual_overseer| Box::pin(async move { + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + + // Assert that subsystem is not sending Participation messages because we issued a local statement + assert!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await.is_none()); + + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + assert!(virtual_overseer.try_recv().await.is_none()); + + test_state + })); +} + +#[test] +fn resume_dispute_with_local_statement_without_local_key() { + let session = 1; + + let mut test_state = TestState::default(); + test_state.subsystem_keystore = make_keystore(&[Sr25519Keyring::Two]).into(); + test_state.resume(|mut test_state, mut virtual_overseer| Box::pin(async move { + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + + let candidate_receipt = CandidateReceipt::default(); + let candidate_hash = candidate_receipt.hash(); + + test_state.activate_leaf_at_session( + &mut virtual_overseer, + session, + 1, + ).await; + + let local_valid_vote = test_state.issue_statement_with_index( + 0, + candidate_hash, + session, + true, + ).await; + + let valid_vote = test_state.issue_statement_with_index( + 1, + candidate_hash, + session, + true, + ).await; + + let invalid_vote = test_state.issue_statement_with_index( + 2, + candidate_hash, + session, + false, + ).await; + + let (pending_confirmation, confirmation_rx) = oneshot::channel(); + virtual_overseer.send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::ImportStatements { + candidate_hash, + candidate_receipt: candidate_receipt.clone(), + session, + statements: vec![ + (local_valid_vote, ValidatorIndex(0)), + (valid_vote, ValidatorIndex(1)), + (invalid_vote, ValidatorIndex(2)), + ], + pending_confirmation, + }, + }).await; + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::DisputeParticipation( + DisputeParticipationMessage::Participate { + report_availability, + .. + } + ) => { + report_availability.send(true).unwrap(); + } + ); + + assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport)); + + { + let (tx, rx) = oneshot::channel(); + + virtual_overseer.send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::ActiveDisputes(tx), + }).await; + + assert_eq!(rx.await.unwrap().len(), 1); + } + + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + assert!(virtual_overseer.try_recv().await.is_none()); + + test_state + })) + // Alice should send a DisputeParticiationMessage::Participate on restart since she has no + // local statement for the active dispute. + .resume(|test_state, mut virtual_overseer| Box::pin(async move { + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + + // Assert that subsystem is not sending Participation messages because we issued a local statement + assert!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await.is_none()); virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; assert!(virtual_overseer.try_recv().await.is_none());