diff --git a/zebra-state/src/service/chain_tip.rs b/zebra-state/src/service/chain_tip.rs index 38e76e20de4..b70fb668e91 100644 --- a/zebra-state/src/service/chain_tip.rs +++ b/zebra-state/src/service/chain_tip.rs @@ -540,6 +540,15 @@ impl TipAction { } } + /// Returns the block height of this tip action, + /// regardless of the underlying variant. + pub fn best_tip_height(&self) -> block::Height { + match self { + Grow { block } => block.height, + Reset { height, .. } => *height, + } + } + /// Returns a [`Grow`] based on `block`. pub(crate) fn grow_with(block: ChainTipBlock) -> Self { Grow { block } diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 44895768f13..5154843d1cd 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -33,6 +33,7 @@ use tower::util::BoxService; use crate::{ components::{ mempool::{self, Mempool}, + sync, tokio::{RuntimeRun, TokioComponent}, ChainSync, Inbound, }, @@ -105,11 +106,26 @@ impl StartCmd { .send((peer_set.clone(), address_book, mempool.clone())) .map_err(|_| eyre!("could not send setup data to inbound service"))?; + let syncer_error_future = syncer.sync(); + + let sync_gossip_task_handle = tokio::spawn(sync::gossip_best_tip_block_hashes( + sync_status.clone(), + chain_tip_change, + peer_set.clone(), + )); + + let mempool_crawler_task_handle = mempool::Crawler::spawn(peer_set, mempool, sync_status); + select! { - result = syncer.sync().fuse() => result, - _ = mempool::Crawler::spawn(peer_set, mempool, sync_status).fuse() => { - unreachable!("The mempool crawler only stops if it panics"); - } + sync_result = syncer_error_future.fuse() => sync_result, + + sync_gossip_result = sync_gossip_task_handle.fuse() => sync_gossip_result + .expect("unexpected panic in the chain tip block gossip task") + .map_err(|e| eyre!(e)), + + mempool_crawl_result = mempool_crawler_task_handle.fuse() => mempool_crawl_result + .expect("unexpected panic in the mempool crawler") + .map_err(|e| eyre!(e)), } } } diff --git a/zebrad/src/components.rs b/zebrad/src/components.rs index 29e97d226e1..59c62691dd0 100644 --- a/zebrad/src/components.rs +++ b/zebrad/src/components.rs @@ -8,7 +8,7 @@ mod inbound; pub mod mempool; pub mod metrics; -mod sync; +pub mod sync; pub mod tokio; pub mod tracing; diff --git a/zebrad/src/components/inbound/tests.rs b/zebrad/src/components/inbound/tests.rs index ea20ffa4ddd..29538fb6ff9 100644 --- a/zebrad/src/components/inbound/tests.rs +++ b/zebrad/src/components/inbound/tests.rs @@ -1,38 +1,42 @@ use std::{collections::HashSet, iter::FromIterator, net::SocketAddr, str::FromStr, sync::Arc}; -use super::mempool::{unmined_transactions_in_blocks, Mempool}; -use crate::components::sync::SyncStatus; - use futures::FutureExt; -use tokio::sync::oneshot; +use tokio::{sync::oneshot, task::JoinHandle}; use tower::{ buffer::Buffer, builder::ServiceBuilder, load_shed::LoadShed, util::BoxService, Service, ServiceExt, }; - use tracing::Span; + use zebra_chain::{ block::Block, parameters::Network, serialization::ZcashDeserializeInto, transaction::{UnminedTx, UnminedTxId}, }; - use zebra_consensus::{error::TransactionError, transaction, Config as ConsensusConfig}; use zebra_network::{AddressBook, Request, Response}; use zebra_state::Config as StateConfig; use zebra_test::mock_service::{MockService, PanicAssertion}; +use crate::components::{ + mempool::{unmined_transactions_in_blocks, Mempool}, + sync::{self, BlockGossipError, SyncStatus}, +}; + #[tokio::test] async fn mempool_requests_for_transactions() { - let (inbound_service, added_transactions, _, mut peer_set, _) = setup(true).await; + let ( + inbound_service, + _committed_blocks, + added_transactions, + _mock_tx_verifier, + mut peer_set, + _state_guard, + sync_gossip_task_handle, + ) = setup(true).await; - let added_transaction_ids: Vec = added_transactions - .clone() - .unwrap() - .iter() - .map(|t| t.id) - .collect(); + let added_transaction_ids: Vec = added_transactions.iter().map(|t| t.id).collect(); // Test `Request::MempoolTransactionIds` let response = inbound_service @@ -58,11 +62,19 @@ async fn mempool_requests_for_transactions() { .await; match response { - Ok(Response::Transactions(response)) => assert_eq!(response, added_transactions.unwrap()), + Ok(Response::Transactions(response)) => assert_eq!(response, added_transactions), _ => unreachable!("`TransactionsById` requests should always respond `Ok(Vec)`"), }; + // check that nothing unexpected happened peer_set.expect_no_requests().await; + + let sync_gossip_result = sync_gossip_task_handle.now_or_never(); + assert!( + matches!(sync_gossip_result, None), + "unexpected error or panic in sync gossip task: {:?}", + sync_gossip_result, + ); } #[tokio::test] @@ -74,7 +86,15 @@ async fn mempool_push_transaction() -> Result<(), crate::BoxError> { // use the first transaction that is not coinbase let tx = block.transactions[1].clone(); - let (inbound_service, _, mut tx_verifier, mut peer_set, _) = setup(false).await; + let ( + inbound_service, + _committed_blocks, + _added_transactions, + mut tx_verifier, + mut peer_set, + _state_guard, + sync_gossip_task_handle, + ) = setup(false).await; // Test `Request::PushTransaction` let request = inbound_service @@ -104,8 +124,16 @@ async fn mempool_push_transaction() -> Result<(), crate::BoxError> { ), }; + // check that nothing unexpected happened peer_set.expect_no_requests().await; + let sync_gossip_result = sync_gossip_task_handle.now_or_never(); + assert!( + matches!(sync_gossip_result, None), + "unexpected error or panic in sync gossip task: {:?}", + sync_gossip_result, + ); + Ok(()) } @@ -123,7 +151,15 @@ async fn mempool_advertise_transaction_ids() -> Result<(), crate::BoxError> { let test_transaction_id = test_transaction.unmined_id(); let txs = HashSet::from_iter([test_transaction_id]); - let (inbound_service, _, mut tx_verifier, mut peer_set, _) = setup(false).await; + let ( + inbound_service, + _committed_blocks, + _added_transactions, + mut tx_verifier, + mut peer_set, + _state_guard, + sync_gossip_task_handle, + ) = setup(false).await; // Test `Request::AdvertiseTransactionIds` let request = inbound_service @@ -164,8 +200,16 @@ async fn mempool_advertise_transaction_ids() -> Result<(), crate::BoxError> { ), }; + // check that nothing unexpected happened peer_set.expect_no_requests().await; + let sync_gossip_result = sync_gossip_task_handle.now_or_never(); + assert!( + matches!(sync_gossip_result, None), + "unexpected error or panic in sync gossip task: {:?}", + sync_gossip_result, + ); + Ok(()) } @@ -187,7 +231,15 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> { let mut tx2_id = tx2.unmined_id(); // Get services - let (inbound_service, _, mut tx_verifier, _peer_set, state_service) = setup(false).await; + let ( + inbound_service, + _committed_blocks, + _added_transactions, + mut tx_verifier, + mut peer_set, + state_service, + sync_gossip_task_handle, + ) = setup(false).await; // Push test transaction let request = inbound_service @@ -220,17 +272,22 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> { }; // Add a new block to the state (make the chain tip advance) - let block_one: Arc = zebra_test::vectors::BLOCK_MAINNET_2_BYTES + let block_two: Arc = zebra_test::vectors::BLOCK_MAINNET_2_BYTES .zcash_deserialize_into() .unwrap(); state_service .clone() .oneshot(zebra_state::Request::CommitFinalizedBlock( - block_one.clone().into(), + block_two.clone().into(), )) .await .unwrap(); + peer_set + .expect_request(Request::AdvertiseBlock(block_two.hash())) + .await + .respond(Response::Nil); + // Make sure tx1 is still in the mempool as it is not expired yet. let request = inbound_service .clone() @@ -247,17 +304,22 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> { }; // As our test transaction will expire at a block height greater or equal to 3 we need to push block 3. - let block_two: Arc = zebra_test::vectors::BLOCK_MAINNET_3_BYTES + let block_three: Arc = zebra_test::vectors::BLOCK_MAINNET_3_BYTES .zcash_deserialize_into() .unwrap(); state_service .clone() .oneshot(zebra_state::Request::CommitFinalizedBlock( - block_two.clone().into(), + block_three.clone().into(), )) .await .unwrap(); + peer_set + .expect_request(Request::AdvertiseBlock(block_three.hash())) + .await + .respond(Response::Nil); + // Push a second transaction to trigger `remove_expired_transactions()` let request = inbound_service .clone() @@ -322,6 +384,11 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> { .await .unwrap(); + peer_set + .expect_request(Request::AdvertiseBlock(block.hash())) + .await + .respond(Response::Nil); + let request = inbound_service .clone() .oneshot(Request::MempoolTransactionIds) @@ -338,6 +405,16 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> { }; } + // check that nothing unexpected happened + peer_set.expect_no_requests().await; + + let sync_gossip_result = sync_gossip_task_handle.now_or_never(); + assert!( + matches!(sync_gossip_result, None), + "unexpected error or panic in sync gossip task: {:?}", + sync_gossip_result, + ); + Ok(()) } @@ -345,7 +422,8 @@ async fn setup( add_transactions: bool, ) -> ( LoadShed>, - Option>, + Vec>, + Vec, MockService, MockService, Buffer< @@ -356,6 +434,7 @@ async fn setup( >, zebra_state::Request, >, + JoinHandle>, ) { let network = Network::Mainnet; let consensus_config = ConsensusConfig::default(); @@ -372,12 +451,14 @@ async fn setup( zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone()) .await; - let peer_set = MockService::build().for_unit_tests(); + let mut peer_set = MockService::build().for_unit_tests(); let buffered_peer_set = Buffer::new(BoxService::new(peer_set.clone()), 10); let mock_tx_verifier = MockService::build().for_unit_tests(); let buffered_tx_verifier = Buffer::new(BoxService::new(mock_tx_verifier.clone()), 10); + let mut committed_blocks = Vec::new(); + // Push the genesis block to the state. // This must be done before creating the mempool to avoid `chain_tip_change` // returning "reset" which would clear the mempool. @@ -393,6 +474,7 @@ async fn setup( )) .await .unwrap(); + committed_blocks.push(genesis_block); // Also push block 1. // Block one is a network upgrade and the mempool will be cleared at it, @@ -407,23 +489,24 @@ async fn setup( )) .await .unwrap(); + committed_blocks.push(block_one); let mut mempool_service = Mempool::new( network, buffered_peer_set.clone(), state_service.clone(), buffered_tx_verifier.clone(), - sync_status, + sync_status.clone(), latest_chain_tip, - chain_tip_change, + chain_tip_change.clone(), ); // Enable the mempool let _ = mempool_service.enable(&mut recent_syncs).await; - let mut added_transactions = None; + let mut added_transactions = Vec::new(); if add_transactions { - added_transactions = Some(add_some_stuff_to_mempool(&mut mempool_service, network)); + added_transactions.extend(add_some_stuff_to_mempool(&mut mempool_service, network)); } let mempool_service = BoxService::new(mempool_service); @@ -442,14 +525,33 @@ async fn setup( let r = setup_tx.send((buffered_peer_set, address_book, mempool)); // We can't expect or unwrap because the returned Result does not implement Debug - assert!(r.is_ok()); + assert!(r.is_ok(), "unexpected setup channel send failure"); + + let sync_gossip_task_handle = tokio::spawn(sync::gossip_best_tip_block_hashes( + sync_status.clone(), + chain_tip_change, + peer_set.clone(), + )); + + // Make sure there is an additional request broadcasting the + // committed blocks to peers. + // + // (The genesis block gets skipped, because block 1 is committed before the task is spawned.) + for block in committed_blocks.iter().skip(1) { + peer_set + .expect_request(Request::AdvertiseBlock(block.hash())) + .await + .respond(Response::Nil); + } ( inbound_service, + committed_blocks, added_transactions, mock_tx_verifier, peer_set, state_service, + sync_gossip_task_handle, ) } diff --git a/zebrad/src/components/mempool/crawler.rs b/zebrad/src/components/mempool/crawler.rs index 17d6e21e7e4..c93002c66bd 100644 --- a/zebrad/src/components/mempool/crawler.rs +++ b/zebrad/src/components/mempool/crawler.rs @@ -69,6 +69,8 @@ where /// Runs until the [`SyncStatus`] loses its connection to the chain syncer, which happens when /// Zebra is shutting down. pub async fn run(mut self) -> Result<(), BoxError> { + info!("initializing mempool crawler task"); + while self.status.wait_until_close_to_tip().await.is_ok() { self.crawl_transactions().await?; sleep(RATE_LIMIT_DELAY).await; diff --git a/zebrad/src/components/sync.rs b/zebrad/src/components/sync.rs index 435d1ea0a46..8857f01bc53 100644 --- a/zebrad/src/components/sync.rs +++ b/zebrad/src/components/sync.rs @@ -21,6 +21,7 @@ use zebra_state as zs; use crate::{config::ZebradConfig, BoxError}; mod downloads; +mod gossip; mod recent_sync_lengths; mod status; @@ -28,6 +29,8 @@ mod status; mod tests; use downloads::{AlwaysHedge, Downloads}; + +pub use gossip::{gossip_best_tip_block_hashes, BlockGossipError}; pub use recent_sync_lengths::RecentSyncLengths; pub use status::SyncStatus; diff --git a/zebrad/src/components/sync/gossip.rs b/zebrad/src/components/sync/gossip.rs new file mode 100644 index 00000000000..c394504a1ae --- /dev/null +++ b/zebrad/src/components/sync/gossip.rs @@ -0,0 +1,85 @@ +//! A task that gossips newly verified [`block::Hash`]es to peers. + +use thiserror::Error; +use tokio::sync::watch; +use tower::{timeout::Timeout, Service, ServiceExt}; + +use zebra_network as zn; +use zebra_state::ChainTipChange; + +use crate::BoxError; + +use super::{SyncStatus, TIPS_RESPONSE_TIMEOUT}; + +use BlockGossipError::*; + +/// Errors that can occur when gossiping committed blocks +#[derive(Error, Debug)] +pub enum BlockGossipError { + #[error("chain tip sender was dropped")] + TipChange(watch::error::RecvError), + + #[error("sync status sender was dropped")] + SyncStatus(watch::error::RecvError), + + #[error("permanent peer set failure")] + PeerSetReadiness(zn::BoxError), +} + +/// Run continuously, gossiping newly verified [`block::Hash`]es to peers. +/// +/// Once the state has reached the chain tip, broadcast the [`block::Hash`]es +/// of newly verified blocks to all ready peers. +/// +/// Blocks are only gossiped if they are: +/// - on the best chain, and +/// - the most recent block verified since the last gossip. +/// +/// In particular, if a lot of blocks are committed at the same time, +/// gossips will be disabled or skipped until the state reaches the latest tip. +pub async fn gossip_best_tip_block_hashes( + mut sync_status: SyncStatus, + mut chain_state: ChainTipChange, + broadcast_network: ZN, +) -> Result<(), BlockGossipError> +where + ZN: Service + Send + Clone + 'static, + ZN::Future: Send, +{ + info!("initializing block gossip task"); + + // use the same timeout as tips requests, + // so broadcasts don't delay the syncer too long + let mut broadcast_network = Timeout::new(broadcast_network, TIPS_RESPONSE_TIMEOUT); + + loop { + // wait for at least one tip change, to make sure we have a new block hash to broadcast + let tip_action = chain_state.wait_for_tip_change().await.map_err(TipChange)?; + + // wait until we're close to the tip, because broadcasts are only useful for nodes near the tip + // (if they're a long way from the tip, they use the syncer and block locators) + sync_status + .wait_until_close_to_tip() + .await + .map_err(SyncStatus)?; + + // get the latest tip change - it might be different to the change we awaited, + // because the syncer might take a long time to reach the tip + let tip_action = chain_state.last_tip_change().unwrap_or(tip_action); + + // block broadcasts inform other nodes about new blocks, + // so our internal Grow or Reset state doesn't matter to them + let request = zn::Request::AdvertiseBlock(tip_action.best_tip_hash()); + + let height = tip_action.best_tip_height(); + info!(?height, ?request, "sending committed block broadcast"); + + // broadcast requests don't return errors, and we'd just want to ignore them anyway + let _ = broadcast_network + .ready_and() + .await + .map_err(PeerSetReadiness)? + .call(request) + .await; + } +}