Skip to content

Commit

Permalink
test(network): inbound messages are forwarded to the registry
Browse files Browse the repository at this point in the history
  • Loading branch information
teor2345 committed Feb 7, 2022
1 parent f3e85c1 commit 09d0f12
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 25 deletions.
9 changes: 5 additions & 4 deletions zebra-network/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ mod minimum_peer_version;

#[cfg(any(test, feature = "proptest-impl"))]
pub use client::tests::ClientTestHarness;
#[cfg(not(test))]
use client::ClientRequest;

#[cfg(test)]
pub(crate) use client::tests::ReceiveRequestAttempt;
#[cfg(test)]
pub(crate) use client::{tests::ReceiveRequestAttempt, ClientRequest};
pub(crate) use handshake::register_inventory_status;

use client::{ClientRequestReceiver, InProgressClientRequest, MustUseOneshotSender};

pub(crate) use client::CancelHeartbeatTask;
pub(crate) use client::{CancelHeartbeatTask, ClientRequest};

pub use client::Client;
pub use connection::Connection;
Expand Down
2 changes: 1 addition & 1 deletion zebra-network/src/peer/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,7 @@ where
}

/// Register any advertised or missing inventory in `msg` for `connected_addr`.
async fn register_inventory_status(
pub(crate) async fn register_inventory_status(
msg: Result<Message, SerializationError>,
connected_addr: ConnectedAddr,
inv_collector: broadcast::Sender<InventoryChange>,
Expand Down
19 changes: 19 additions & 0 deletions zebra-network/src/peer_set/inventory_registry/tests.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,22 @@
//! Tests for the inventory registry.
use tokio::sync::broadcast;

use crate::peer_set::inventory_registry::{InventoryChange, InventoryRegistry};

mod prop;
mod vectors;

/// The number of changes that can be pending in the inventory channel, before it starts lagging.
///
/// Lagging drops messages, so tests should avoid filling the channel.
pub const MAX_PENDING_CHANGES: usize = 32;

/// Returns a newly initialised inventory registry, and a sender for its inventory channel.
fn new_inv_registry() -> (InventoryRegistry, broadcast::Sender<InventoryChange>) {
let (inv_stream_tx, inv_stream_rx) = broadcast::channel(MAX_PENDING_CHANGES);

let inv_registry = InventoryRegistry::new(inv_stream_rx);

(inv_registry, inv_stream_tx)
}
140 changes: 140 additions & 0 deletions zebra-network/src/peer_set/inventory_registry/tests/prop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
//! Randomised property tests for the inventory registry.
use std::{collections::HashSet, net::SocketAddr};

use proptest::prelude::*;

use crate::{
peer::{register_inventory_status, ConnectedAddr},
peer_set::inventory_registry::{
tests::{new_inv_registry, MAX_PENDING_CHANGES},
InventoryMarker,
},
protocol::external::{InventoryHash, Message},
};

use InventoryHash::*;

/// The maximum number of random hashes we want to use in these tests.
pub const MAX_HASH_COUNT: usize = 5;

proptest! {
/// Check inventory registration works via the inbound peer message channel wrapper.
#[test]
fn inv_registry_inbound_wrapper_ok(
status in any::<InventoryMarker>(),
test_hashes in prop::collection::hash_set(any::<InventoryHash>(), 0..=MAX_HASH_COUNT)
) {
prop_assert!(MAX_HASH_COUNT <= MAX_PENDING_CHANGES, "channel must not block in tests");

// Start the runtime
let runtime = zebra_test::init_async();
let _guard = runtime.enter();

runtime.block_on(async move {
// Check all combinations of:
//
// Inventory availability:
// - Available
// - Missing
//
// Inventory multiplicity:
// - Empty messages are ignored without errors
// - Single is handled correctly
// - Multiple are handled correctly
//
// And inventory variants:
// - Block (empty and single only)
// - Tx for legacy v4 and earlier transactions
// - Wtx for v5 and later transactions
//
// Using randomised proptest inventory data.
inv_registry_inbound_wrapper_with(status, test_hashes).await;
})
}
}

/// Check inventory registration works via the inbound peer message channel wrapper.
#[tracing::instrument]
async fn inv_registry_inbound_wrapper_with(
status: InventoryMarker,
test_hashes: HashSet<InventoryHash>,
) {
let test_peer: SocketAddr = "1.1.1.1:1"
.parse()
.expect("unexpected invalid peer address");
let test_peer = ConnectedAddr::new_inbound_direct(test_peer);

let test_hashes: Vec<InventoryHash> = test_hashes.into_iter().collect();
let test_msg = if status.is_available() {
Message::Inv(test_hashes.clone())
} else {
Message::NotFound(test_hashes.clone())
};

let (mut inv_registry, inv_stream_tx) = new_inv_registry();

let forwarded_msg =
register_inventory_status(Ok(test_msg.clone()), test_peer, inv_stream_tx.clone()).await;
assert_eq!(
test_msg.clone(),
forwarded_msg.expect("unexpected forwarded error result"),
);

inv_registry
.update()
.await
.expect("unexpected dropped registry sender channel");

let test_peer = test_peer
.get_transient_addr()
.expect("unexpected None: expected Some transient peer address");

for &test_hash in test_hashes.iter() {
// The registry should ignore these unsupported types.
// (Currently, it panics if they are registered, but they are ok to query.)
if matches!(test_hash, Error | FilteredBlock(_)) {
assert_eq!(inv_registry.advertising_peers(test_hash).count(), 0);
assert_eq!(inv_registry.missing_peers(test_hash).count(), 0);
continue;
}

if status.is_available() {
// register_inventory_status doesn't register multi-block available inventory,
// for performance reasons.
if test_hashes.len() > 1 && test_hash.block_hash().is_some() {
assert_eq!(inv_registry.advertising_peers(test_hash).count(), 0);
assert_eq!(inv_registry.missing_peers(test_hash).count(), 0);
continue;
}

assert_eq!(
inv_registry.advertising_peers(test_hash).next(),
Some(&test_peer),
"unexpected None hash: {:?},\n\
in message {:?} \n\
with length {}\n\
should be advertised\n",
test_hash,
test_msg,
test_hashes.len(),
);
assert_eq!(inv_registry.advertising_peers(test_hash).count(), 1);
assert_eq!(inv_registry.missing_peers(test_hash).count(), 0);
} else {
assert_eq!(inv_registry.advertising_peers(test_hash).count(), 0);
assert_eq!(
inv_registry.missing_peers(test_hash).next(),
Some(&test_peer),
"unexpected None hash: {:?},\n\
in message {:?} \n\
with length {}\n\
should be advertised\n",
test_hash,
test_msg,
test_hashes.len(),
);
assert_eq!(inv_registry.missing_peers(test_hash).count(), 1);
}
}
}
21 changes: 1 addition & 20 deletions zebra-network/src/peer_set/inventory_registry/tests/vectors.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,12 @@
//! Fixed test vectors for the inventory registry.
use tokio::sync::broadcast;

use zebra_chain::block;

use crate::{
peer_set::{
inventory_registry::{InventoryRegistry, InventoryStatus},
InventoryChange,
},
peer_set::inventory_registry::{tests::new_inv_registry, InventoryStatus},
protocol::external::InventoryHash,
};

/// The number of changes that can be pending in the inventory channel, before it starts lagging.
///
/// Lagging drops messages, so tests should avoid filling the channel.
pub const MAX_PENDING_CHANGES: usize = 32;

/// Check an empty inventory registry works as expected.
#[tokio::test]
async fn inv_registry_empty_ok() {
Expand Down Expand Up @@ -192,12 +182,3 @@ async fn inv_registry_prefer_current_order(missing_current: bool) {
assert_eq!(inv_registry.missing_peers(test_hash).count(), 0);
}
}

/// Returns a newly initialised inventory registry, and a sender for its inventory channel.
fn new_inv_registry() -> (InventoryRegistry, broadcast::Sender<InventoryChange>) {
let (inv_stream_tx, inv_stream_rx) = broadcast::channel(MAX_PENDING_CHANGES);

let inv_registry = InventoryRegistry::new(inv_stream_rx);

(inv_registry, inv_stream_tx)
}

0 comments on commit 09d0f12

Please sign in to comment.