From cc6ef8cbd76d263c9a11509ac5ad0b216d1aa2f4 Mon Sep 17 00:00:00 2001 From: Lldenaurois Date: Wed, 21 Jul 2021 17:36:43 -0400 Subject: [PATCH] Dispute coordinator - Recover disputes on startup (#3481) * node/dispute-coordinator: Introduce resume capability This commit introduces a resume capability for the dispute coordinator subsystem. Specifically, this will allow to recover data for disputes for which we have no local statements. * node/dispute-coordinator: Add resume function to TestState and modify Harness This commit modifies the TestHarness to return a TestState. We subsequently define a resume function on TestState that allows to interrupt the test and test specifically for behavior on startup of the subsystem. * node/dispute-coordinator: Implement resume functionality This commit implements the resume functionality for the subsystem. In addition, we will forward any DisputeParticipation::Participate message in order to ensure that disputes for which we do not have local statements may be recovered in due time. * Address Feedback * Modify to run handle_leaf on first import * Modify missing_local_statement logic * node/dispute-coordinator: Add simple test to ensure we adequately handle local_statements that are not missing. * Add missing keystore tests --- Cargo.lock | 1 + node/core/dispute-coordinator/Cargo.toml | 1 + node/core/dispute-coordinator/src/lib.rs | 137 ++++- node/core/dispute-coordinator/src/tests.rs | 591 ++++++++++++++++++++- 4 files changed, 698 insertions(+), 32 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e435d788c4a4..ac235663836e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6337,6 +6337,7 @@ dependencies = [ "polkadot-node-subsystem", "polkadot-node-subsystem-test-helpers", "polkadot-node-subsystem-util", + "polkadot-overseer", "polkadot-primitives", "sc-keystore", "sp-core", diff --git a/node/core/dispute-coordinator/Cargo.toml b/node/core/dispute-coordinator/Cargo.toml index 72af0db6ed62..71024dd20168 100644 --- a/node/core/dispute-coordinator/Cargo.toml +++ b/node/core/dispute-coordinator/Cargo.toml @@ -27,3 +27,4 @@ sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" } assert_matches = "1.4.0" +polkadot-overseer = { path = "../../overseer" } diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs index f2b956f3fe6d..50d2d7f78253 100644 --- a/node/core/dispute-coordinator/src/lib.rs +++ b/node/core/dispute-coordinator/src/lib.rs @@ -35,7 +35,7 @@ use polkadot_node_subsystem::{ errors::{ChainApiError, RuntimeApiError}, messages::{ ChainApiMessage, DisputeCoordinatorMessage, DisputeDistributionMessage, - DisputeParticipationMessage, ImportStatementsResult + DisputeParticipationMessage, ImportStatementsResult, } }; use polkadot_node_subsystem_util::rolling_session_window::{ @@ -71,10 +71,27 @@ const ACTIVE_DURATION_SECS: Timestamp = 180; /// Timestamp based on the 1 Jan 1970 UNIX base, which is persistent across node restarts and OS reboots. type Timestamp = u64; +#[derive(Eq, PartialEq)] +enum Participation { + Pending, + Complete, +} + +impl Participation { + fn complete(&mut self) -> bool { + let complete = *self == Participation::Complete; + if !complete { + *self = Participation::Complete + } + complete + } +} + struct State { keystore: Arc, highest_session: Option, rolling_session_window: RollingSessionWindow, + recovery_state: Participation, } /// Configuration for the dispute coordinator subsystem. @@ -277,7 +294,7 @@ where B: Backend, { loop { - let res = run_iteration(&mut ctx, &subsystem, &mut backend, &*clock).await; + let res = run_until_error(&mut ctx, &subsystem, &mut backend, &*clock).await; match res { Err(e) => { e.trace(); @@ -299,7 +316,7 @@ where // // A return value of `Ok` indicates that an exit should be made, while non-fatal errors // lead to another call to this function. -async fn run_iteration( +async fn run_until_error( ctx: &mut Context, subsystem: &DisputeCoordinatorSubsystem, backend: &mut B, @@ -314,6 +331,7 @@ where keystore: subsystem.keystore.clone(), highest_session: None, rolling_session_window: RollingSessionWindow::new(DISPUTE_WINDOW), + recovery_state: Participation::Pending, }; loop { @@ -328,7 +346,14 @@ where &mut overlay_db, &mut state, update.activated.into_iter().map(|a| a.hash), - ).await? + ).await?; + if !state.recovery_state.complete() { + handle_startup( + ctx, + &mut overlay_db, + &mut state, + ).await?; + } } FromOverseer::Signal(OverseerSignal::BlockFinalized(_, _)) => {}, FromOverseer::Communication { msg } => { @@ -349,6 +374,98 @@ where } } +// Restores the subsystem's state before proceeding with the main event loop. Primarily, this +// repopulates the rolling session window the relevant session information to handle incoming +// import statement requests. +// +// This method also retransmits a `DisputeParticiationMessage::Participate` for any non-concluded +// disputes for which the subsystem doesn't have a local statement, ensuring it eventually makes an +// arbitration on the dispute. +async fn handle_startup( + ctx: &mut Context, + overlay_db: &mut OverlayedBackend<'_, impl Backend>, + state: &mut State, +) -> Result<(), Error> +where + Context: overseer::SubsystemContext, + Context: SubsystemContext, +{ + let recent_disputes = match overlay_db.load_recent_disputes() { + Ok(Some(disputes)) => disputes, + Ok(None) => return Ok(()), + Err(e) => { + tracing::error!(target: LOG_TARGET, "Failed initial load of recent disputes: {:?}", e); + return Err(e.into()); + }, + }; + + // Filter out disputes that have already concluded. + let active_disputes = recent_disputes.into_iter() + .filter(|(_, status)| *status == DisputeStatus::Active) + .collect::(); + + for ((session, ref candidate_hash), _) in active_disputes.into_iter() { + let votes: CandidateVotes = match overlay_db.load_candidate_votes(session, candidate_hash) { + Ok(Some(votes)) => votes.into(), + Ok(None) => continue, + Err(e) => { + tracing::error!(target: LOG_TARGET, "Failed initial load of candidate votes: {:?}", e); + continue + }, + }; + + let validators = match state.rolling_session_window.session_info(session) { + None => { + tracing::warn!( + target: LOG_TARGET, + session, + "Missing info for session which has an active dispute", + ); + continue + } + Some(info) => info.validators.clone(), + }; + + let n_validators = validators.len(); + let voted_indices: HashSet<_> = votes.voted_indices().into_iter().collect(); + + // Determine if there are any missing local statements for this dispute. Validators are + // filtered if: + // 1) their statement already exists, or + // 2) the validator key is not in the local keystore (i.e. the validator is remote). + // The remaining set only contains local validators that are also missing statements. + let missing_local_statement = validators.iter() + .enumerate() + .map(|(index, validator)| (ValidatorIndex(index as _), validator)) + .any(|(index, validator)| + !voted_indices.contains(&index) && + state.keystore + .key_pair::(validator) + .ok() + .map_or(false, |v| v.is_some()) + ); + + // Send a `DisputeParticipationMessage` for all non-concluded disputes which do not have a + // recorded local statement. + if missing_local_statement { + let (report_availability, receive_availability) = oneshot::channel(); + ctx.send_message(DisputeParticipationMessage::Participate { + candidate_hash: *candidate_hash, + candidate_receipt: votes.candidate_receipt.clone(), + session, + n_validators: n_validators as u32, + report_availability, + }).await; + + if !receive_availability.await? { + tracing::debug!(target: LOG_TARGET, "Participation failed. Candidate not available"); + } + } + } + + Ok(()) +} + async fn handle_new_activations( ctx: &mut (impl SubsystemContext + overseer::SubsystemContext), overlay_db: &mut OverlayedBackend<'_, impl Backend>, @@ -470,8 +587,8 @@ async fn handle_incoming( ) => { issue_local_statement( ctx, - state, overlay_db, + state, candidate_hash, candidate_receipt, session, @@ -547,6 +664,10 @@ async fn handle_import_statements( "Missing info for session which has an active dispute", ); + pending_confirmation + .send(ImportStatementsResult::InvalidImport) + .map_err(|_| Error::OneshotSend)?; + return Ok(()) } Some(info) => info.validators.clone(), @@ -671,13 +792,17 @@ async fn handle_import_statements( overlay_db.write_candidate_votes(session, candidate_hash, votes.into()); + pending_confirmation + .send(ImportStatementsResult::ValidImport) + .map_err(|_| Error::OneshotSend)?; + Ok(()) } async fn issue_local_statement( ctx: &mut impl SubsystemContext, - state: &mut State, overlay_db: &mut OverlayedBackend<'_, impl Backend>, + state: &mut State, candidate_hash: CandidateHash, candidate_receipt: CandidateReceipt, session: SessionIndex, diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index 92ca2f6ffe3e..b3610df2f043 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -14,8 +14,10 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +use std::collections::HashMap; use super::*; +use overseer::TimeoutExt; use polkadot_primitives::v1::{BlakeTwo256, HashT, ValidatorId, Header, SessionInfo}; use polkadot_node_subsystem::{jaeger, ActiveLeavesUpdate, ActivatedLeaf, LeafStatus}; use polkadot_node_subsystem::messages::{ @@ -33,6 +35,9 @@ use parity_scale_codec::Encode; use assert_matches::assert_matches; use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering}; +use std::time::Duration; + +const TEST_TIMEOUT: Duration = Duration::from_secs(2); // sets up a keystore with the given keyring accounts. fn make_keystore(accounts: &[Sr25519Keyring]) -> LocalKeystore { @@ -86,6 +91,7 @@ struct TestState { db: Arc, config: Config, clock: MockClock, + headers: HashMap, } impl Default for TestState { @@ -126,13 +132,14 @@ impl Default for TestState { db, config, clock: MockClock::default(), + headers: HashMap::new(), } } } impl TestState { async fn activate_leaf_at_session( - &self, + &mut self, virtual_overseer: &mut VirtualOverseer, session: SessionIndex, block_number: BlockNumber, @@ -149,6 +156,8 @@ impl TestState { }; let block_hash = block_header.hash(); + let _ = self.headers.insert(block_hash, block_header.clone()); + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves( ActiveLeavesUpdate::start_work(ActivatedLeaf { hash: block_hash, @@ -158,6 +167,16 @@ impl TestState { }) ))).await; + self.handle_sync_queries(virtual_overseer, block_hash, block_header, session).await; + } + + async fn handle_sync_queries( + &self, + virtual_overseer: &mut VirtualOverseer, + block_hash: Hash, + block_header: Header, + session: SessionIndex, + ) { assert_matches!( virtual_overseer.recv().await, AllMessages::ChainApi(ChainApiMessage::BlockHeader(h, tx)) => { @@ -172,6 +191,7 @@ impl TestState { h, RuntimeApiRequest::SessionIndexForChild(tx), )) => { + let parent_hash = session_to_hash(session, b"parent"); assert_eq!(h, parent_hash); let _ = tx.send(Ok(session)); } @@ -194,6 +214,29 @@ impl TestState { } } + async fn handle_resume_sync(&self, virtual_overseer: &mut VirtualOverseer, session: SessionIndex) { + let leaves: Vec = self.headers.keys().cloned().collect(); + for leaf in leaves.iter() { + virtual_overseer.send( + FromOverseer::Signal( + OverseerSignal::ActiveLeaves( + ActiveLeavesUpdate::start_work( + ActivatedLeaf { + hash: *leaf, + number: 1, + span: Arc::new(jaeger::Span::Disabled), + status: LeafStatus::Fresh, + } + ) + ) + ) + ).await; + + let header = self.headers.get(leaf).unwrap().clone(); + self.handle_sync_queries(virtual_overseer, *leaf, header, session).await; + } + } + fn session_info(&self) -> SessionInfo { let discovery_keys = self.validators.iter() .map(|k| <_>::from(k.public())) @@ -236,32 +279,38 @@ impl TestState { public, ).await.unwrap().unwrap() } -} - -fn test_harness(test: F) - where F: FnOnce(TestState, VirtualOverseer) -> BoxFuture<'static, ()> -{ - let (ctx, ctx_handle) = make_subsystem_context(TaskExecutor::new()); - let state = TestState::default(); - let subsystem = DisputeCoordinatorSubsystem::new( - state.db.clone(), - state.config.clone(), - state.subsystem_keystore.clone(), - ); + fn resume(self, test: F) -> Self + where F: FnOnce(TestState, VirtualOverseer) -> BoxFuture<'static, TestState> + { + let (ctx, ctx_handle) = make_subsystem_context(TaskExecutor::new()); + let subsystem = DisputeCoordinatorSubsystem::new( + self.db.clone(), + self.config.clone(), + self.subsystem_keystore.clone(), + ); + let backend = DbBackend::new(self.db.clone(), self.config.column_config()); + let subsystem_task = run(subsystem, ctx, backend, Box::new(self.clock.clone())); + let test_task = test(self, ctx_handle); - let backend = DbBackend::new(state.db.clone(), state.config.column_config()); - let subsystem_task = run(subsystem, ctx, backend, Box::new(state.clock.clone())); - let test_task = test(state, ctx_handle); + let (_, state) = futures::executor::block_on(future::join(subsystem_task, test_task)); + state + } +} - futures::executor::block_on(future::join(subsystem_task, test_task)); +fn test_harness(test: F) -> TestState + where F: FnOnce(TestState, VirtualOverseer) -> BoxFuture<'static, TestState> +{ + TestState::default().resume(test) } #[test] fn conflicting_votes_lead_to_dispute_participation() { - test_harness(|test_state, mut virtual_overseer| Box::pin(async move { + test_harness(|mut test_state, mut virtual_overseer| Box::pin(async move { let session = 1; + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + let candidate_receipt = CandidateReceipt::default(); let candidate_hash = candidate_receipt.hash(); @@ -374,14 +423,18 @@ fn conflicting_votes_lead_to_dispute_participation() { // This confirms that the second vote doesn't lead to participation again. assert!(virtual_overseer.try_recv().await.is_none()); + + test_state })); } #[test] fn positive_votes_dont_trigger_participation() { - test_harness(|test_state, mut virtual_overseer| Box::pin(async move { + test_harness(|mut test_state, mut virtual_overseer| Box::pin(async move { let session = 1; + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + let candidate_receipt = CandidateReceipt::default(); let candidate_hash = candidate_receipt.hash(); @@ -477,14 +530,18 @@ fn positive_votes_dont_trigger_participation() { // This confirms that no participation request is made. assert!(virtual_overseer.try_recv().await.is_none()); + + test_state })); } #[test] fn wrong_validator_index_is_ignored() { - test_harness(|test_state, mut virtual_overseer| Box::pin(async move { + test_harness(|mut test_state, mut virtual_overseer| Box::pin(async move { let session = 1; + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + let candidate_receipt = CandidateReceipt::default(); let candidate_hash = candidate_receipt.hash(); @@ -547,14 +604,18 @@ fn wrong_validator_index_is_ignored() { // This confirms that no participation request is made. assert!(virtual_overseer.try_recv().await.is_none()); + + test_state })); } #[test] fn finality_votes_ignore_disputed_candidates() { - test_harness(|test_state, mut virtual_overseer| Box::pin(async move { + test_harness(|mut test_state, mut virtual_overseer| Box::pin(async move { let session = 1; + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + let candidate_receipt = CandidateReceipt::default(); let candidate_hash = candidate_receipt.hash(); @@ -639,14 +700,18 @@ fn finality_votes_ignore_disputed_candidates() { virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; assert!(virtual_overseer.try_recv().await.is_none()); + + test_state })); } #[test] fn supermajority_valid_dispute_may_be_finalized() { - test_harness(|test_state, mut virtual_overseer| Box::pin(async move { + test_harness(|mut test_state, mut virtual_overseer| Box::pin(async move { let session = 1; + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + let candidate_receipt = CandidateReceipt::default(); let candidate_hash = candidate_receipt.hash(); @@ -688,7 +753,23 @@ fn supermajority_valid_dispute_may_be_finalized() { }, }).await; - let _ = virtual_overseer.recv().await; + assert_matches!( + virtual_overseer.recv().await, + AllMessages::DisputeParticipation( + DisputeParticipationMessage::Participate { + candidate_hash: c_hash, + candidate_receipt: c_receipt, + session: s, + report_availability, + .. + } + ) => { + assert_eq!(candidate_hash, c_hash); + assert_eq!(candidate_receipt, c_receipt); + assert_eq!(session, s); + report_availability.send(true).unwrap(); + } + ); let mut statements = Vec::new(); for i in (0..supermajority_threshold - 1).map(|i| i + 2) { @@ -748,14 +829,18 @@ fn supermajority_valid_dispute_may_be_finalized() { virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; assert!(virtual_overseer.try_recv().await.is_none()); + + test_state })); } #[test] fn concluded_supermajority_for_non_active_after_time() { - test_harness(|test_state, mut virtual_overseer| Box::pin(async move { + test_harness(|mut test_state, mut virtual_overseer| Box::pin(async move { let session = 1; + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + let candidate_receipt = CandidateReceipt::default(); let candidate_hash = candidate_receipt.hash(); @@ -854,14 +939,18 @@ fn concluded_supermajority_for_non_active_after_time() { virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; assert!(virtual_overseer.try_recv().await.is_none()); + + test_state })); } #[test] fn concluded_supermajority_against_non_active_after_time() { - test_harness(|test_state, mut virtual_overseer| Box::pin(async move { + test_harness(|mut test_state, mut virtual_overseer| Box::pin(async move { let session = 1; + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + let candidate_receipt = CandidateReceipt::default(); let candidate_hash = candidate_receipt.hash(); @@ -960,14 +1049,18 @@ fn concluded_supermajority_against_non_active_after_time() { virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; assert!(virtual_overseer.try_recv().await.is_none()); + + test_state })); } #[test] fn fresh_dispute_ignored_if_unavailable() { - test_harness(|test_state, mut virtual_overseer| Box::pin(async move { + test_harness(|mut test_state, mut virtual_overseer| Box::pin(async move { let session = 1; + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + let candidate_receipt = CandidateReceipt::default(); let candidate_hash = candidate_receipt.hash(); @@ -1037,5 +1130,451 @@ fn fresh_dispute_ignored_if_unavailable() { virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; assert!(virtual_overseer.try_recv().await.is_none()); + + test_state + })); +} + +#[test] +fn resume_dispute_without_local_statement() { + let session = 1; + + test_harness(|mut test_state, mut virtual_overseer| Box::pin(async move { + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + + let candidate_receipt = CandidateReceipt::default(); + let candidate_hash = candidate_receipt.hash(); + + test_state.activate_leaf_at_session( + &mut virtual_overseer, + session, + 1, + ).await; + + let valid_vote = test_state.issue_statement_with_index( + 1, + candidate_hash, + session, + true, + ).await; + + let invalid_vote = test_state.issue_statement_with_index( + 2, + candidate_hash, + session, + false, + ).await; + + let (pending_confirmation, confirmation_rx) = oneshot::channel(); + virtual_overseer.send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::ImportStatements { + candidate_hash, + candidate_receipt: candidate_receipt.clone(), + session, + statements: vec![ + (valid_vote, ValidatorIndex(1)), + (invalid_vote, ValidatorIndex(2)), + ], + pending_confirmation, + }, + }).await; + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::DisputeParticipation( + DisputeParticipationMessage::Participate { + report_availability, + .. + } + ) => { + report_availability.send(true).unwrap(); + } + ); + + assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport)); + + { + let (tx, rx) = oneshot::channel(); + + virtual_overseer.send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::ActiveDisputes(tx), + }).await; + + assert_eq!(rx.await.unwrap().len(), 1); + } + + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + assert!(virtual_overseer.try_recv().await.is_none()); + + test_state + })) + // Alice should send a DisputeParticiationMessage::Participate on restart since she has no + // local statement for the active dispute. + .resume(|test_state, mut virtual_overseer| Box::pin(async move { + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + + let candidate_receipt = CandidateReceipt::default(); + let candidate_hash = candidate_receipt.hash(); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::DisputeParticipation( + DisputeParticipationMessage::Participate { + candidate_hash: c_hash, + candidate_receipt: c_receipt, + session: s, + report_availability, + .. + } + ) => { + assert_eq!(candidate_hash, c_hash); + assert_eq!(candidate_receipt, c_receipt); + assert_eq!(session, s); + report_availability.send(true).unwrap(); + } + ); + + let valid_vote0 = test_state.issue_statement_with_index( + 0, + candidate_hash, + session, + true, + ).await; + let valid_vote3 = test_state.issue_statement_with_index( + 3, + candidate_hash, + session, + true, + ).await; + let valid_vote4 = test_state.issue_statement_with_index( + 4, + candidate_hash, + session, + true, + ).await; + let valid_vote5 = test_state.issue_statement_with_index( + 5, + candidate_hash, + session, + true, + ).await; + + let (pending_confirmation, _confirmation_rx) = oneshot::channel(); + virtual_overseer.send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::ImportStatements { + candidate_hash, + candidate_receipt: candidate_receipt.clone(), + session, + statements: vec![ + (valid_vote0, ValidatorIndex(0)), + (valid_vote3, ValidatorIndex(3)), + (valid_vote4, ValidatorIndex(4)), + (valid_vote5, ValidatorIndex(5)), + ], + pending_confirmation, + }, + }).await; + + // Advance the clock far enough so that the concluded dispute will be omitted from an + // ActiveDisputes query. + test_state.clock.set(test_state.clock.now() + ACTIVE_DURATION_SECS + 1 ); + + { + let (tx, rx) = oneshot::channel(); + + virtual_overseer.send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::ActiveDisputes(tx), + }).await; + + assert!(rx.await.unwrap().is_empty()); + } + + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + assert!(virtual_overseer.try_recv().await.is_none()); + + test_state + })); +} + +#[test] +fn resume_dispute_with_local_statement() { + let session = 1; + + test_harness(|mut test_state, mut virtual_overseer| Box::pin(async move { + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + + let candidate_receipt = CandidateReceipt::default(); + let candidate_hash = candidate_receipt.hash(); + + test_state.activate_leaf_at_session( + &mut virtual_overseer, + session, + 1, + ).await; + + let local_valid_vote = test_state.issue_statement_with_index( + 0, + candidate_hash, + session, + true, + ).await; + + let valid_vote = test_state.issue_statement_with_index( + 1, + candidate_hash, + session, + true, + ).await; + + let invalid_vote = test_state.issue_statement_with_index( + 2, + candidate_hash, + session, + false, + ).await; + + let (pending_confirmation, confirmation_rx) = oneshot::channel(); + virtual_overseer.send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::ImportStatements { + candidate_hash, + candidate_receipt: candidate_receipt.clone(), + session, + statements: vec![ + (local_valid_vote, ValidatorIndex(0)), + (valid_vote, ValidatorIndex(1)), + (invalid_vote, ValidatorIndex(2)), + ], + pending_confirmation, + }, + }).await; + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::DisputeParticipation( + DisputeParticipationMessage::Participate { + report_availability, + .. + } + ) => { + report_availability.send(true).unwrap(); + } + ); + + assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport)); + + { + let (tx, rx) = oneshot::channel(); + + virtual_overseer.send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::ActiveDisputes(tx), + }).await; + + assert_eq!(rx.await.unwrap().len(), 1); + } + + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + assert!(virtual_overseer.try_recv().await.is_none()); + + test_state + })) + // Alice should send a DisputeParticiationMessage::Participate on restart since she has no + // local statement for the active dispute. + .resume(|test_state, mut virtual_overseer| Box::pin(async move { + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + + // Assert that subsystem is not sending Participation messages because we issued a local statement + assert!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await.is_none()); + + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + assert!(virtual_overseer.try_recv().await.is_none()); + + test_state + })); +} + +#[test] +fn resume_dispute_without_local_statement_or_local_key() { + let session = 1; + let mut test_state = TestState::default(); + test_state.subsystem_keystore = make_keystore(&[Sr25519Keyring::Two]).into(); + test_state.resume(|mut test_state, mut virtual_overseer| Box::pin(async move { + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + + let candidate_receipt = CandidateReceipt::default(); + let candidate_hash = candidate_receipt.hash(); + + test_state.activate_leaf_at_session( + &mut virtual_overseer, + session, + 1, + ).await; + + let valid_vote = test_state.issue_statement_with_index( + 1, + candidate_hash, + session, + true, + ).await; + + let invalid_vote = test_state.issue_statement_with_index( + 2, + candidate_hash, + session, + false, + ).await; + + let (pending_confirmation, confirmation_rx) = oneshot::channel(); + virtual_overseer.send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::ImportStatements { + candidate_hash, + candidate_receipt: candidate_receipt.clone(), + session, + statements: vec![ + (valid_vote, ValidatorIndex(1)), + (invalid_vote, ValidatorIndex(2)), + ], + pending_confirmation, + }, + }).await; + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::DisputeParticipation( + DisputeParticipationMessage::Participate { + report_availability, + .. + } + ) => { + report_availability.send(true).unwrap(); + } + ); + + assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport)); + + { + let (tx, rx) = oneshot::channel(); + + virtual_overseer.send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::ActiveDisputes(tx), + }).await; + + assert_eq!(rx.await.unwrap().len(), 1); + } + + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + assert!(virtual_overseer.try_recv().await.is_none()); + + test_state + })) + // Alice should send a DisputeParticiationMessage::Participate on restart since she has no + // local statement for the active dispute. + .resume(|test_state, mut virtual_overseer| Box::pin(async move { + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + + // Assert that subsystem is not sending Participation messages because we issued a local statement + assert!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await.is_none()); + + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + assert!(virtual_overseer.try_recv().await.is_none()); + + test_state + })); +} + +#[test] +fn resume_dispute_with_local_statement_without_local_key() { + let session = 1; + + let mut test_state = TestState::default(); + test_state.subsystem_keystore = make_keystore(&[Sr25519Keyring::Two]).into(); + test_state.resume(|mut test_state, mut virtual_overseer| Box::pin(async move { + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + + let candidate_receipt = CandidateReceipt::default(); + let candidate_hash = candidate_receipt.hash(); + + test_state.activate_leaf_at_session( + &mut virtual_overseer, + session, + 1, + ).await; + + let local_valid_vote = test_state.issue_statement_with_index( + 0, + candidate_hash, + session, + true, + ).await; + + let valid_vote = test_state.issue_statement_with_index( + 1, + candidate_hash, + session, + true, + ).await; + + let invalid_vote = test_state.issue_statement_with_index( + 2, + candidate_hash, + session, + false, + ).await; + + let (pending_confirmation, confirmation_rx) = oneshot::channel(); + virtual_overseer.send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::ImportStatements { + candidate_hash, + candidate_receipt: candidate_receipt.clone(), + session, + statements: vec![ + (local_valid_vote, ValidatorIndex(0)), + (valid_vote, ValidatorIndex(1)), + (invalid_vote, ValidatorIndex(2)), + ], + pending_confirmation, + }, + }).await; + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::DisputeParticipation( + DisputeParticipationMessage::Participate { + report_availability, + .. + } + ) => { + report_availability.send(true).unwrap(); + } + ); + + assert_eq!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport)); + + { + let (tx, rx) = oneshot::channel(); + + virtual_overseer.send(FromOverseer::Communication { + msg: DisputeCoordinatorMessage::ActiveDisputes(tx), + }).await; + + assert_eq!(rx.await.unwrap().len(), 1); + } + + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + assert!(virtual_overseer.try_recv().await.is_none()); + + test_state + })) + // Alice should send a DisputeParticiationMessage::Participate on restart since she has no + // local statement for the active dispute. + .resume(|test_state, mut virtual_overseer| Box::pin(async move { + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + + // Assert that subsystem is not sending Participation messages because we issued a local statement + assert!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await.is_none()); + + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + assert!(virtual_overseer.try_recv().await.is_none()); + + test_state })); }