From 7c1e33101fbf0f0d71a53578ea77953ed57a6f93 Mon Sep 17 00:00:00 2001 From: Conrado Gouvea Date: Tue, 21 Sep 2021 16:14:48 -0300 Subject: [PATCH 1/5] Send mined transaction IDs to the download and verify task for cancellation --- zebrad/src/components/mempool.rs | 18 ++++++++++++++++- zebrad/src/components/mempool/downloads.rs | 23 +++++++++++++++++++++- 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index a75f799bac5..0b3d9f68db4 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -17,7 +17,7 @@ use zebra_chain::{ use zebra_consensus::{error::TransactionError, transaction}; use zebra_network as zn; use zebra_state as zs; -use zs::ChainTipChange; +use zebra_state::{ChainTipChange, TipAction}; pub use crate::BoxError; @@ -142,6 +142,22 @@ impl Service for Mempool { Pin> + Send + 'static>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + if let Some(tip_action) = self.chain_tip_change.tip_change().now_or_never() { + match tip_action? { + // Clear the mempool if there has been a chain tip reset. + TipAction::Reset { height: _, hash: _ } => { + // TODO: https://github.com/ZcashFoundation/zebra/pull/2777/ + } + // Cancel downloads/verifications of transactions with the same + // IDs as recently mined transactions. + TipAction::Grow { block } => { + for txid in block.transaction_hashes.iter() { + self.tx_downloads.cancel(txid); + } + } + } + } + // Clean up completed download tasks and add to mempool if successful while let Poll::Ready(Some(r)) = self.tx_downloads.as_mut().poll_next(cx) { if let Ok(tx) = r { diff --git a/zebrad/src/components/mempool/downloads.rs b/zebrad/src/components/mempool/downloads.rs index 4c7efcdb553..6713ca3db31 100644 --- a/zebrad/src/components/mempool/downloads.rs +++ b/zebrad/src/components/mempool/downloads.rs @@ -16,7 +16,7 @@ use tokio::{sync::oneshot, task::JoinHandle}; use tower::{Service, ServiceExt}; use tracing_futures::Instrument; -use zebra_chain::transaction::{UnminedTx, UnminedTxId}; +use zebra_chain::transaction::{self, UnminedTx, UnminedTxId}; use zebra_consensus::transaction as tx; use zebra_network as zn; use zebra_state as zs; @@ -315,6 +315,27 @@ where Ok(()) } + /// Cancel download/verification tasks of transactions with the + /// given transaction hash (see [`UnminedTxId::mined_id`]). + pub fn cancel(&mut self, mined_id: &transaction::Hash) { + // TODO: this requires going through the entire list of running tasks. + // If this becomes an issue, another HashMap may be needed. + // TODO: this can be simplified with [`HashMap::drain_filter`] which + // is currently nightly-only experimental API. + let removed_txids: Vec = self + .cancel_handles + .keys() + .filter(|txid| txid.mined_id() == *mined_id) + .cloned() + .collect(); + + for txid in removed_txids { + if let Some(handle) = self.cancel_handles.remove(&txid) { + let _ = handle.send(()); + } + } + } + /// Check if transaction is already in the state. async fn transaction_in_state(state: &mut ZS, txid: UnminedTxId) -> Result<(), BoxError> { // Check if the transaction is already in the state. From 8aaf532798f634010fbcf73b1a116c1e844b89d4 Mon Sep 17 00:00:00 2001 From: Conrado Gouvea Date: Fri, 24 Sep 2021 16:45:41 -0300 Subject: [PATCH 2/5] Pass a HashSet of transaction hashes to be cancelled --- zebrad/src/components/mempool.rs | 5 ++--- zebrad/src/components/mempool/downloads.rs | 8 +++----- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index a363929d5c4..5e77a8a671d 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -151,9 +151,8 @@ impl Service for Mempool { // Cancel downloads/verifications of transactions with the same // IDs as recently mined transactions. TipAction::Grow { block } => { - for txid in block.transaction_hashes.iter() { - self.tx_downloads.cancel(txid); - } + let txid_set = block.transaction_hashes.iter().collect(); + self.tx_downloads.cancel(txid_set); } } } diff --git a/zebrad/src/components/mempool/downloads.rs b/zebrad/src/components/mempool/downloads.rs index 6713ca3db31..4f5869a151f 100644 --- a/zebrad/src/components/mempool/downloads.rs +++ b/zebrad/src/components/mempool/downloads.rs @@ -1,5 +1,5 @@ use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, pin::Pin, task::{Context, Poll}, time::Duration, @@ -317,15 +317,13 @@ where /// Cancel download/verification tasks of transactions with the /// given transaction hash (see [`UnminedTxId::mined_id`]). - pub fn cancel(&mut self, mined_id: &transaction::Hash) { - // TODO: this requires going through the entire list of running tasks. - // If this becomes an issue, another HashMap may be needed. + pub fn cancel(&mut self, mined_ids: HashSet<&transaction::Hash>) { // TODO: this can be simplified with [`HashMap::drain_filter`] which // is currently nightly-only experimental API. let removed_txids: Vec = self .cancel_handles .keys() - .filter(|txid| txid.mined_id() == *mined_id) + .filter(|txid| mined_ids.contains(&txid.mined_id())) .cloned() .collect(); From 16a99676fea1cd92a6937743bd03091f64935a1c Mon Sep 17 00:00:00 2001 From: Conrado Gouvea Date: Mon, 27 Sep 2021 16:01:42 -0300 Subject: [PATCH 3/5] Add mempool_cancel_mined() test --- zebrad/src/components/mempool/downloads.rs | 5 + zebrad/src/components/mempool/tests.rs | 126 ++++++++++++++++++++- 2 files changed, 130 insertions(+), 1 deletion(-) diff --git a/zebrad/src/components/mempool/downloads.rs b/zebrad/src/components/mempool/downloads.rs index 4f5869a151f..8d465f6cfb5 100644 --- a/zebrad/src/components/mempool/downloads.rs +++ b/zebrad/src/components/mempool/downloads.rs @@ -334,6 +334,11 @@ where } } + /// Get the number of currently in-flight download tasks. + pub fn in_flight(&self) -> usize { + self.pending.len() + } + /// Check if transaction is already in the state. async fn transaction_in_state(state: &mut ZS, txid: UnminedTxId) -> Result<(), BoxError> { // Check if the transaction is already in the state. diff --git a/zebrad/src/components/mempool/tests.rs b/zebrad/src/components/mempool/tests.rs index 091557d071c..8b7de99868b 100644 --- a/zebrad/src/components/mempool/tests.rs +++ b/zebrad/src/components/mempool/tests.rs @@ -1,9 +1,11 @@ use super::*; use color_eyre::Report; -use std::collections::HashSet; +use std::{collections::HashSet, sync::Arc}; use storage::tests::unmined_transactions_in_blocks; use tower::{ServiceBuilder, ServiceExt}; +use zebra_chain::block::Block; +use zebra_chain::serialization::ZcashDeserializeInto; use zebra_consensus::Config as ConsensusConfig; use zebra_state::Config as StateConfig; use zebra_test::mock_service::MockService; @@ -219,3 +221,125 @@ async fn mempool_queue() -> Result<(), Report> { Ok(()) } + +#[tokio::test] +async fn mempool_cancel_mined() -> Result<(), Report> { + let block1: Arc = zebra_test::vectors::BLOCK_MAINNET_1_BYTES + .zcash_deserialize_into() + .unwrap(); + let block2: Arc = zebra_test::vectors::BLOCK_MAINNET_2_BYTES + .zcash_deserialize_into() + .unwrap(); + + // Using the mainnet for now + let network = Network::Mainnet; + let consensus_config = ConsensusConfig::default(); + let state_config = StateConfig::ephemeral(); + let peer_set = MockService::build().for_unit_tests(); + let (sync_status, _recent_syncs) = SyncStatus::new(); + let (state, _latest_chain_tip, chain_tip_change) = + zebra_state::init(state_config.clone(), network); + + let mut state_service = ServiceBuilder::new().buffer(1).service(state); + let (_chain_verifier, tx_verifier) = + zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone()) + .await; + + // Start the mempool service + let mut mempool = Mempool::new( + network, + Buffer::new(BoxService::new(peer_set), 1), + state_service.clone(), + tx_verifier, + sync_status, + chain_tip_change, + ); + + // Push the genesis block to the state + let genesis_block: Arc = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES + .zcash_deserialize_into() + .unwrap(); + state_service + .ready_and() + .await + .unwrap() + .call(zebra_state::Request::CommitFinalizedBlock( + genesis_block.clone().into(), + )) + .await + .unwrap(); + + // Queue transaction from block 2 for download + let txid = block2.transactions[0].unmined_id(); + let response = mempool + .ready_and() + .await + .unwrap() + .call(Request::Queue(vec![txid.into()])) + .await + .unwrap(); + let queued_responses = match response { + Response::Queued(queue_responses) => queue_responses, + _ => unreachable!("will never happen in this test"), + }; + assert_eq!(queued_responses.len(), 1); + assert!(queued_responses[0].is_ok()); + assert_eq!(mempool.tx_downloads.in_flight(), 1); + + // Query the mempool to make it poll chain_tip_change + for _ in 0..10 { + let _response = mempool + .ready_and() + .await + .unwrap() + .call(Request::TransactionIds) + .await + .unwrap(); + } + + // Push block 1 to the state + println!("Comitting block 1..."); + state_service + .ready_and() + .await + .unwrap() + .call(zebra_state::Request::CommitFinalizedBlock( + block1.clone().into(), + )) + .await + .unwrap(); + + // Query the mempool to make it poll chain_tip_change + let _response = mempool + .ready_and() + .await + .unwrap() + .call(Request::TransactionIds) + .await + .unwrap(); + + // Push block 2 to the state + println!("Comitting block 2..."); + state_service + .oneshot(zebra_state::Request::CommitFinalizedBlock( + block2.clone().into(), + )) + .await + .unwrap(); + + for _ in 0..10 { + // Query the mempool just to poll it and make it cancel the download. + let _response = mempool + .ready_and() + .await + .unwrap() + .call(Request::TransactionIds) + .await + .unwrap(); + } + + // Check if download was cancelled. + assert_eq!(mempool.tx_downloads.in_flight(), 0); + + Ok(()) +} From 337dc3476413ad848f92e72b04f5fdca807de91c Mon Sep 17 00:00:00 2001 From: Conrado Gouvea Date: Wed, 29 Sep 2021 16:15:57 -0300 Subject: [PATCH 4/5] Fix starvation in test --- zebrad/src/components/mempool/tests.rs | 28 +++++++++++++++----------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/zebrad/src/components/mempool/tests.rs b/zebrad/src/components/mempool/tests.rs index 8b7de99868b..18176992504 100644 --- a/zebrad/src/components/mempool/tests.rs +++ b/zebrad/src/components/mempool/tests.rs @@ -2,6 +2,7 @@ use super::*; use color_eyre::Report; use std::{collections::HashSet, sync::Arc}; use storage::tests::unmined_transactions_in_blocks; +use tokio::time; use tower::{ServiceBuilder, ServiceExt}; use zebra_chain::block::Block; @@ -245,6 +246,8 @@ async fn mempool_cancel_mined() -> Result<(), Report> { zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone()) .await; + time::pause(); + // Start the mempool service let mut mempool = Mempool::new( network, @@ -287,18 +290,15 @@ async fn mempool_cancel_mined() -> Result<(), Report> { assert_eq!(mempool.tx_downloads.in_flight(), 1); // Query the mempool to make it poll chain_tip_change - for _ in 0..10 { - let _response = mempool - .ready_and() - .await - .unwrap() - .call(Request::TransactionIds) - .await - .unwrap(); - } + let _response = mempool + .ready_and() + .await + .unwrap() + .call(Request::TransactionIds) + .await + .unwrap(); // Push block 1 to the state - println!("Comitting block 1..."); state_service .ready_and() .await @@ -319,7 +319,6 @@ async fn mempool_cancel_mined() -> Result<(), Report> { .unwrap(); // Push block 2 to the state - println!("Comitting block 2..."); state_service .oneshot(zebra_state::Request::CommitFinalizedBlock( block2.clone().into(), @@ -327,7 +326,10 @@ async fn mempool_cancel_mined() -> Result<(), Report> { .await .unwrap(); - for _ in 0..10 { + // This is done twice because after the first query the cancellation + // is picked up by select!, and after the second the mempool gets the + // result and the download future is removed. + for _ in 0..2 { // Query the mempool just to poll it and make it cancel the download. let _response = mempool .ready_and() @@ -336,6 +338,8 @@ async fn mempool_cancel_mined() -> Result<(), Report> { .call(Request::TransactionIds) .await .unwrap(); + // Sleep to avoid starvation and make sure the cancellation is picked up. + time::sleep(time::Duration::from_millis(100)).await; } // Check if download was cancelled. From 13bc5d2c8dd841919a05f733c2487d0447db8482 Mon Sep 17 00:00:00 2001 From: Conrado Gouvea Date: Wed, 29 Sep 2021 17:24:51 -0300 Subject: [PATCH 5/5] Fix typo in comment --- zebrad/src/components/mempool.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index 442e4486b89..64b05f72591 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -83,7 +83,7 @@ enum ActiveState { /// ##: Correctness: only components internal to the [`Mempool`] struct are allowed to /// inject transactions into `storage`, as transactions must be verified beforehand. storage: storage::Storage, - /// The transaction dowload and verify stream. + /// The transaction download and verify stream. tx_downloads: Pin>, }, } @@ -278,7 +278,7 @@ impl Service for Mempool { ActiveState::Disabled => { // When the mempool is disabled we still return that the service is ready. // Otherwise, callers could block waiting for the mempool to be enabled, - // which may not be the desired behaviour. + // which may not be the desired behavior. } }