Skip to content

Commit

Permalink
test: add tests for SnapshotHostInfo broadcast behavior (#10204)
Browse files Browse the repository at this point in the history
Add a few tests which test that `SnapshotHostInfo` is properly broadcast
between peers.

`SnapshotHostInfo` lets other peers know that a certain peer has a state
snapshot at a specific point in time, which is later used for state
sync. Nodes publish this information periodically and it should be
broadcast to all nodes in the network.

Fixes: #10172
  • Loading branch information
jancionear authored Nov 27, 2023
1 parent f64c064 commit 0d64bc4
Show file tree
Hide file tree
Showing 4 changed files with 344 additions and 8 deletions.
46 changes: 46 additions & 0 deletions chain/network/src/peer_manager/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ use crate::accounts_data::AccountDataCacheSnapshot;
use crate::broadcast;
use crate::config;
use crate::network_protocol::testonly as data;
use crate::network_protocol::SnapshotHostInfo;
use crate::network_protocol::SyncSnapshotHosts;
use crate::network_protocol::{
EdgeState, Encoding, PeerInfo, PeerMessage, SignedAccountData, SyncAccountsData,
};
use crate::peer;
use crate::peer::peer_actor::ClosingReason;
use crate::peer_manager::network_state::NetworkState;
use crate::peer_manager::peer_manager_actor::Event as PME;
use crate::snapshot_hosts::SnapshotHostsCache;
use crate::tcp;
use crate::test_utils;
use crate::testonly::actix::ActixSystem;
Expand All @@ -28,6 +31,14 @@ use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

/// Each actix arbiter (in fact, the underlying tokio runtime) creates 4 file descriptors:
/// 1. eventfd2()
/// 2. epoll_create1()
/// 3. fcntl() duplicating one end of some globally shared socketpair()
/// 4. fcntl() duplicating epoll socket created in (2)
/// This gives 5 file descriptors per PeerActor (4 + 1 TCP socket).
pub(crate) const FDS_PER_PEER: usize = 5;

#[derive(actix::Message)]
#[rtype("()")]
struct WithNetworkState(
Expand Down Expand Up @@ -67,6 +78,16 @@ pub(crate) fn unwrap_sync_accounts_data_processed(ev: Event) -> Option<SyncAccou
}
}

pub(crate) fn unwrap_sync_snapshot_hosts_data_processed(ev: Event) -> Option<SyncSnapshotHosts> {
match ev {
Event::PeerManager(PME::MessageProcessed(
tcp::Tier::T2,
PeerMessage::SyncSnapshotHosts(msg),
)) => Some(msg),
_ => None,
}
}

pub(crate) fn make_chain_info(
chain: &data::Chain,
validators: &[&config::NetworkConfig],
Expand Down Expand Up @@ -409,6 +430,31 @@ impl ActorHandler {
.await
}

// Awaits until the snapshot_hosts state satisfies predicate `pred`.
pub async fn wait_for_snapshot_hosts_pred(
&self,
pred: impl Fn(Arc<SnapshotHostsCache>) -> bool,
) {
let mut events = self.events.from_now();
loop {
let got = self.with_state(move |s| async move { s.snapshot_hosts.clone() }).await;
if pred(got) {
break;
}

// If the state doesn't match, wait until the next snapshot_hosts event is processed and check again.
events.recv_until(unwrap_sync_snapshot_hosts_data_processed).await;
}
}

// Awaits until the snapshot_hosts state matches `want`.
pub async fn wait_for_snapshot_hosts(&self, want: &HashSet<Arc<SnapshotHostInfo>>) {
self.wait_for_snapshot_hosts_pred(|cache| {
&cache.get_hosts().into_iter().collect::<HashSet<_>>() == want
})
.await
}

pub async fn wait_for_direct_connection(&self, target_peer_id: PeerId) {
let mut events = self.events.from_now();
loop {
Expand Down
9 changes: 1 addition & 8 deletions chain/network/src/peer_manager/tests/accounts_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,12 @@ use itertools::Itertools;
use near_async::time;
use near_o11y::testonly::init_test_logger;
use near_store::db::TestDB;
use peer_manager::testonly::FDS_PER_PEER;
use pretty_assertions::assert_eq;
use rand::seq::SliceRandom as _;
use std::collections::HashSet;
use std::sync::Arc;

/// Each actix arbiter (in fact, the underlying tokio runtime) creates 4 file descriptors:
/// 1. eventfd2()
/// 2. epoll_create1()
/// 3. fcntl() duplicating one end of some globally shared socketpair()
/// 4. fcntl() duplicating epoll socket created in (2)
/// This gives 5 file descriptors per PeerActor (4 + 1 TCP socket).
const FDS_PER_PEER: usize = 5;

#[tokio::test]
async fn broadcast() {
init_test_logger();
Expand Down
1 change: 1 addition & 0 deletions chain/network/src/peer_manager/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ mod accounts_data;
mod connection_pool;
mod nonce;
mod routing;
mod snapshot_hosts;
mod tier1;
mod tier2;
Loading

0 comments on commit 0d64bc4

Please sign in to comment.