From 09d0f124cf585a24e22d1b2da3f162ce78d2aed3 Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 7 Feb 2022 12:27:10 +1000 Subject: [PATCH] test(network): inbound messages are forwarded to the registry --- zebra-network/src/peer.rs | 9 +- zebra-network/src/peer/handshake.rs | 2 +- .../src/peer_set/inventory_registry/tests.rs | 19 +++ .../peer_set/inventory_registry/tests/prop.rs | 140 ++++++++++++++++++ .../inventory_registry/tests/vectors.rs | 21 +-- 5 files changed, 166 insertions(+), 25 deletions(-) create mode 100644 zebra-network/src/peer_set/inventory_registry/tests/prop.rs diff --git a/zebra-network/src/peer.rs b/zebra-network/src/peer.rs index a230ffc1714..b434e7acd26 100644 --- a/zebra-network/src/peer.rs +++ b/zebra-network/src/peer.rs @@ -10,14 +10,15 @@ mod minimum_peer_version; #[cfg(any(test, feature = "proptest-impl"))] pub use client::tests::ClientTestHarness; -#[cfg(not(test))] -use client::ClientRequest; + +#[cfg(test)] +pub(crate) use client::tests::ReceiveRequestAttempt; #[cfg(test)] -pub(crate) use client::{tests::ReceiveRequestAttempt, ClientRequest}; +pub(crate) use handshake::register_inventory_status; use client::{ClientRequestReceiver, InProgressClientRequest, MustUseOneshotSender}; -pub(crate) use client::CancelHeartbeatTask; +pub(crate) use client::{CancelHeartbeatTask, ClientRequest}; pub use client::Client; pub use connection::Connection; diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index 25a6b3858f4..6b6e07bcb4b 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -969,7 +969,7 @@ where } /// Register any advertised or missing inventory in `msg` for `connected_addr`. -async fn register_inventory_status( +pub(crate) async fn register_inventory_status( msg: Result, connected_addr: ConnectedAddr, inv_collector: broadcast::Sender, diff --git a/zebra-network/src/peer_set/inventory_registry/tests.rs b/zebra-network/src/peer_set/inventory_registry/tests.rs index e83d591b988..61a8619676f 100644 --- a/zebra-network/src/peer_set/inventory_registry/tests.rs +++ b/zebra-network/src/peer_set/inventory_registry/tests.rs @@ -1,3 +1,22 @@ //! Tests for the inventory registry. +use tokio::sync::broadcast; + +use crate::peer_set::inventory_registry::{InventoryChange, InventoryRegistry}; + +mod prop; mod vectors; + +/// The number of changes that can be pending in the inventory channel, before it starts lagging. +/// +/// Lagging drops messages, so tests should avoid filling the channel. +pub const MAX_PENDING_CHANGES: usize = 32; + +/// Returns a newly initialised inventory registry, and a sender for its inventory channel. +fn new_inv_registry() -> (InventoryRegistry, broadcast::Sender) { + let (inv_stream_tx, inv_stream_rx) = broadcast::channel(MAX_PENDING_CHANGES); + + let inv_registry = InventoryRegistry::new(inv_stream_rx); + + (inv_registry, inv_stream_tx) +} diff --git a/zebra-network/src/peer_set/inventory_registry/tests/prop.rs b/zebra-network/src/peer_set/inventory_registry/tests/prop.rs new file mode 100644 index 00000000000..5d880d557c6 --- /dev/null +++ b/zebra-network/src/peer_set/inventory_registry/tests/prop.rs @@ -0,0 +1,140 @@ +//! Randomised property tests for the inventory registry. + +use std::{collections::HashSet, net::SocketAddr}; + +use proptest::prelude::*; + +use crate::{ + peer::{register_inventory_status, ConnectedAddr}, + peer_set::inventory_registry::{ + tests::{new_inv_registry, MAX_PENDING_CHANGES}, + InventoryMarker, + }, + protocol::external::{InventoryHash, Message}, +}; + +use InventoryHash::*; + +/// The maximum number of random hashes we want to use in these tests. +pub const MAX_HASH_COUNT: usize = 5; + +proptest! { + /// Check inventory registration works via the inbound peer message channel wrapper. + #[test] + fn inv_registry_inbound_wrapper_ok( + status in any::(), + test_hashes in prop::collection::hash_set(any::(), 0..=MAX_HASH_COUNT) + ) { + prop_assert!(MAX_HASH_COUNT <= MAX_PENDING_CHANGES, "channel must not block in tests"); + + // Start the runtime + let runtime = zebra_test::init_async(); + let _guard = runtime.enter(); + + runtime.block_on(async move { + // Check all combinations of: + // + // Inventory availability: + // - Available + // - Missing + // + // Inventory multiplicity: + // - Empty messages are ignored without errors + // - Single is handled correctly + // - Multiple are handled correctly + // + // And inventory variants: + // - Block (empty and single only) + // - Tx for legacy v4 and earlier transactions + // - Wtx for v5 and later transactions + // + // Using randomised proptest inventory data. + inv_registry_inbound_wrapper_with(status, test_hashes).await; + }) + } +} + +/// Check inventory registration works via the inbound peer message channel wrapper. +#[tracing::instrument] +async fn inv_registry_inbound_wrapper_with( + status: InventoryMarker, + test_hashes: HashSet, +) { + let test_peer: SocketAddr = "1.1.1.1:1" + .parse() + .expect("unexpected invalid peer address"); + let test_peer = ConnectedAddr::new_inbound_direct(test_peer); + + let test_hashes: Vec = test_hashes.into_iter().collect(); + let test_msg = if status.is_available() { + Message::Inv(test_hashes.clone()) + } else { + Message::NotFound(test_hashes.clone()) + }; + + let (mut inv_registry, inv_stream_tx) = new_inv_registry(); + + let forwarded_msg = + register_inventory_status(Ok(test_msg.clone()), test_peer, inv_stream_tx.clone()).await; + assert_eq!( + test_msg.clone(), + forwarded_msg.expect("unexpected forwarded error result"), + ); + + inv_registry + .update() + .await + .expect("unexpected dropped registry sender channel"); + + let test_peer = test_peer + .get_transient_addr() + .expect("unexpected None: expected Some transient peer address"); + + for &test_hash in test_hashes.iter() { + // The registry should ignore these unsupported types. + // (Currently, it panics if they are registered, but they are ok to query.) + if matches!(test_hash, Error | FilteredBlock(_)) { + assert_eq!(inv_registry.advertising_peers(test_hash).count(), 0); + assert_eq!(inv_registry.missing_peers(test_hash).count(), 0); + continue; + } + + if status.is_available() { + // register_inventory_status doesn't register multi-block available inventory, + // for performance reasons. + if test_hashes.len() > 1 && test_hash.block_hash().is_some() { + assert_eq!(inv_registry.advertising_peers(test_hash).count(), 0); + assert_eq!(inv_registry.missing_peers(test_hash).count(), 0); + continue; + } + + assert_eq!( + inv_registry.advertising_peers(test_hash).next(), + Some(&test_peer), + "unexpected None hash: {:?},\n\ + in message {:?} \n\ + with length {}\n\ + should be advertised\n", + test_hash, + test_msg, + test_hashes.len(), + ); + assert_eq!(inv_registry.advertising_peers(test_hash).count(), 1); + assert_eq!(inv_registry.missing_peers(test_hash).count(), 0); + } else { + assert_eq!(inv_registry.advertising_peers(test_hash).count(), 0); + assert_eq!( + inv_registry.missing_peers(test_hash).next(), + Some(&test_peer), + "unexpected None hash: {:?},\n\ + in message {:?} \n\ + with length {}\n\ + should be advertised\n", + test_hash, + test_msg, + test_hashes.len(), + ); + assert_eq!(inv_registry.missing_peers(test_hash).count(), 1); + } + } +} diff --git a/zebra-network/src/peer_set/inventory_registry/tests/vectors.rs b/zebra-network/src/peer_set/inventory_registry/tests/vectors.rs index 448f3837e96..3d9180477fc 100644 --- a/zebra-network/src/peer_set/inventory_registry/tests/vectors.rs +++ b/zebra-network/src/peer_set/inventory_registry/tests/vectors.rs @@ -1,22 +1,12 @@ //! Fixed test vectors for the inventory registry. -use tokio::sync::broadcast; - use zebra_chain::block; use crate::{ - peer_set::{ - inventory_registry::{InventoryRegistry, InventoryStatus}, - InventoryChange, - }, + peer_set::inventory_registry::{tests::new_inv_registry, InventoryStatus}, protocol::external::InventoryHash, }; -/// The number of changes that can be pending in the inventory channel, before it starts lagging. -/// -/// Lagging drops messages, so tests should avoid filling the channel. -pub const MAX_PENDING_CHANGES: usize = 32; - /// Check an empty inventory registry works as expected. #[tokio::test] async fn inv_registry_empty_ok() { @@ -192,12 +182,3 @@ async fn inv_registry_prefer_current_order(missing_current: bool) { assert_eq!(inv_registry.missing_peers(test_hash).count(), 0); } } - -/// Returns a newly initialised inventory registry, and a sender for its inventory channel. -fn new_inv_registry() -> (InventoryRegistry, broadcast::Sender) { - let (inv_stream_tx, inv_stream_rx) = broadcast::channel(MAX_PENDING_CHANGES); - - let inv_registry = InventoryRegistry::new(inv_stream_rx); - - (inv_registry, inv_stream_tx) -}