Skip to content

Commit

Permalink
updated tests
Browse files Browse the repository at this point in the history
  • Loading branch information
fgimenez committed May 13, 2024
1 parent d57f06d commit 16b61b9
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 58 deletions.
8 changes: 4 additions & 4 deletions crates/e2e-test-utils/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ use futures_util::StreamExt;
use reth::network::{NetworkEvent, NetworkEvents, NetworkHandle, PeersInfo};
use reth_primitives::NodeRecord;
use reth_tracing::tracing::info;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_stream::wrappers::BroadcastStream;

/// Helper for network operations
pub struct NetworkTestContext {
network_events: UnboundedReceiverStream<NetworkEvent>,
network_events: BroadcastStream<NetworkEvent>,
network: NetworkHandle,
}

Expand All @@ -22,7 +22,7 @@ impl NetworkTestContext {
self.network.peers_handle().add_peer(node_record.id, node_record.tcp_addr());

match self.network_events.next().await {
Some(NetworkEvent::PeerAdded(_)) => (),
Some(Ok(NetworkEvent::PeerAdded(_))) => (),
_ => panic!("Expected a peer added event"),
}
}
Expand All @@ -35,7 +35,7 @@ impl NetworkTestContext {
/// Expects a session to be established
pub async fn expect_session(&mut self) {
match self.network_events.next().await {
Some(NetworkEvent::SessionEstablished { remote_addr, .. }) => {
Some(Ok(NetworkEvent::SessionEstablished { remote_addr, .. })) => {
info!(?remote_addr, "Session established")
}
_ => panic!("Expected session established event"),
Expand Down
20 changes: 11 additions & 9 deletions crates/net/network/src/test_utils/testnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use tokio::{
},
task::JoinHandle,
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_stream::wrappers::BroadcastStream;

/// A test network consisting of multiple peers.
pub struct Testnet<C, Pool> {
Expand Down Expand Up @@ -503,7 +503,7 @@ impl<Pool> PeerHandle<Pool> {
}

/// Creates a new [`NetworkEvent`] listener channel.
pub fn event_listener(&self) -> UnboundedReceiverStream<NetworkEvent> {
pub fn event_listener(&self) -> BroadcastStream<NetworkEvent> {
self.network.event_listener()
}

Expand Down Expand Up @@ -591,22 +591,24 @@ impl Default for PeerConfig {
/// This makes it easier to await established connections
#[derive(Debug)]
pub struct NetworkEventStream {
inner: UnboundedReceiverStream<NetworkEvent>,
inner: BroadcastStream<NetworkEvent>,
}

// === impl NetworkEventStream ===

impl NetworkEventStream {
/// Create a new [`NetworkEventStream`] from the given network event receiver stream.
pub fn new(inner: UnboundedReceiverStream<NetworkEvent>) -> Self {
pub fn new(inner: BroadcastStream<NetworkEvent>) -> Self {
Self { inner }
}

/// Awaits the next event for a session to be closed
pub async fn next_session_closed(&mut self) -> Option<(PeerId, Option<DisconnectReason>)> {
while let Some(ev) = self.inner.next().await {
match ev {
NetworkEvent::SessionClosed { peer_id, reason } => return Some((peer_id, reason)),
Ok(NetworkEvent::SessionClosed { peer_id, reason }) => {
return Some((peer_id, reason))
}
_ => continue,
}
}
Expand All @@ -617,7 +619,7 @@ impl NetworkEventStream {
pub async fn next_session_established(&mut self) -> Option<PeerId> {
while let Some(ev) = self.inner.next().await {
match ev {
NetworkEvent::SessionEstablished { peer_id, .. } => return Some(peer_id),
Ok(NetworkEvent::SessionEstablished { peer_id, .. }) => return Some(peer_id),
_ => continue,
}
}
Expand All @@ -632,7 +634,7 @@ impl NetworkEventStream {
let mut peers = Vec::with_capacity(num);
while let Some(ev) = self.inner.next().await {
match ev {
NetworkEvent::SessionEstablished { peer_id, .. } => {
Ok(NetworkEvent::SessionEstablished { peer_id, .. }) => {
peers.push(peer_id);
num -= 1;
if num == 0 {
Expand All @@ -650,12 +652,12 @@ impl NetworkEventStream {
/// session.
pub async fn peer_added_and_established(&mut self) -> Option<PeerId> {
let peer_id = match self.inner.next().await {
Some(NetworkEvent::PeerAdded(peer_id)) => peer_id,
Some(Ok(NetworkEvent::PeerAdded(peer_id))) => peer_id,
_ => return None,
};

match self.inner.next().await {
Some(NetworkEvent::SessionEstablished { peer_id: peer_id2, .. }) => {
Some(Ok(NetworkEvent::SessionEstablished { peer_id: peer_id2, .. })) => {
debug_assert_eq!(peer_id, peer_id2, "PeerAdded peer_id {peer_id} does not match SessionEstablished peer_id {peer_id2}");
Some(peer_id)
}
Expand Down
50 changes: 27 additions & 23 deletions crates/net/network/src/transactions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1725,30 +1725,31 @@ mod tests {
let mut established = listener0.take(2);
while let Some(ev) = established.next().await {
match ev {
NetworkEvent::SessionEstablished {
Ok(NetworkEvent::SessionEstablished {
peer_id,
remote_addr,
client_version,
capabilities,
messages,
status,
version,
} => {
}) => {
// to insert a new peer in transactions peerset
transactions.on_network_event(NetworkEvent::SessionEstablished {
transactions.on_network_event(Ok(NetworkEvent::SessionEstablished {
peer_id,
remote_addr,
client_version,
capabilities,
messages,
status,
version,
})
}))
}
NetworkEvent::PeerAdded(_peer_id) => continue,
ev => {
Ok(NetworkEvent::PeerAdded(_peer_id)) => continue,
Ok(ev) => {
panic!("unexpected event {ev:?}")
}
Err(err) => panic!("unexpected error {err:?}"),
}
}
// random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
Expand Down Expand Up @@ -1811,30 +1812,31 @@ mod tests {
let mut established = listener0.take(2);
while let Some(ev) = established.next().await {
match ev {
NetworkEvent::SessionEstablished {
Ok(NetworkEvent::SessionEstablished {
peer_id,
remote_addr,
client_version,
capabilities,
messages,
status,
version,
} => {
}) => {
// to insert a new peer in transactions peerset
transactions.on_network_event(NetworkEvent::SessionEstablished {
transactions.on_network_event(Ok(NetworkEvent::SessionEstablished {
peer_id,
remote_addr,
client_version,
capabilities,
messages,
status,
version,
})
}))
}
NetworkEvent::PeerAdded(_peer_id) => continue,
ev => {
Ok(NetworkEvent::PeerAdded(_peer_id)) => continue,
Ok(ev) => {
panic!("unexpected event {ev:?}")
}
Err(err) => panic!("unexpected error {err:?}"),
}
}
// random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
Expand Down Expand Up @@ -1895,30 +1897,31 @@ mod tests {
let mut established = listener0.take(2);
while let Some(ev) = established.next().await {
match ev {
NetworkEvent::SessionEstablished {
Ok(NetworkEvent::SessionEstablished {
peer_id,
remote_addr,
client_version,
capabilities,
messages,
status,
version,
} => {
}) => {
// to insert a new peer in transactions peerset
transactions.on_network_event(NetworkEvent::SessionEstablished {
transactions.on_network_event(Ok(NetworkEvent::SessionEstablished {
peer_id,
remote_addr,
client_version,
capabilities,
messages,
status,
version,
})
}))
}
NetworkEvent::PeerAdded(_peer_id) => continue,
ev => {
Ok(NetworkEvent::PeerAdded(_peer_id)) => continue,
Ok(ev) => {
panic!("unexpected event {ev:?}")
}
Err(err) => panic!("unexpected error {err:?}"),
}
}
// random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
Expand Down Expand Up @@ -1986,27 +1989,28 @@ mod tests {
let mut established = listener0.take(2);
while let Some(ev) = established.next().await {
match ev {
NetworkEvent::SessionEstablished {
Ok(NetworkEvent::SessionEstablished {
peer_id,
remote_addr,
client_version,
capabilities,
messages,
status,
version,
} => transactions.on_network_event(NetworkEvent::SessionEstablished {
}) => transactions.on_network_event(Ok(NetworkEvent::SessionEstablished {
peer_id,
remote_addr,
client_version,
capabilities,
messages,
status,
version,
}),
NetworkEvent::PeerAdded(_peer_id) => continue,
ev => {
})),
Ok(NetworkEvent::PeerAdded(_peer_id)) => continue,
Ok(ev) => {
panic!("unexpected event {ev:?}")
}
Err(err) => panic!("unexpected error {err:?}"),
}
}
handle.terminate().await;
Expand Down
42 changes: 29 additions & 13 deletions crates/net/network/tests/it/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,19 @@ async fn test_establish_connections() {
let mut established = listener0.take(4);
while let Some(ev) = established.next().await {
match ev {
NetworkEvent::SessionClosed { .. } => {
Ok(NetworkEvent::SessionClosed { .. }) => {
panic!("unexpected event")
}
NetworkEvent::SessionEstablished { peer_id, .. } => {
Ok(NetworkEvent::SessionEstablished { peer_id, .. }) => {
assert!(expected_connections.remove(&peer_id))
}
NetworkEvent::PeerAdded(peer_id) => {
Ok(NetworkEvent::PeerAdded(peer_id)) => {
assert!(expected_peers.remove(&peer_id))
}
NetworkEvent::PeerRemoved(_) => {
Ok(NetworkEvent::PeerRemoved(_)) => {
panic!("unexpected event")
}
Err(e) => panic!("error: {e}"),
}
}
assert!(expected_connections.is_empty());
Expand Down Expand Up @@ -210,8 +211,13 @@ async fn test_connect_with_boot_nodes() {
let mut events = handle.event_listener();
tokio::task::spawn(network);

while let Some(ev) = events.next().await {
dbg!(ev);
while let Some(result_ev) = events.next().await {
match result_ev {
Ok(ev) => {
dbg!(ev);
}
Err(err) => eprintln!("{err}"),
}
}
}

Expand Down Expand Up @@ -246,8 +252,13 @@ async fn test_connect_with_builder() {
}
});

while let Some(ev) = events.next().await {
dbg!(ev);
while let Some(result_ev) = events.next().await {
match result_ev {
Ok(ev) => {
dbg!(ev);
}
Err(err) => eprintln!("{err}"),
}
}
}

Expand Down Expand Up @@ -302,8 +313,13 @@ async fn test_connect_to_trusted_peer() {

dbg!(&headers);

while let Some(ev) = events.next().await {
dbg!(ev);
while let Some(result_ev) = events.next().await {
match result_ev {
Ok(ev) => {
dbg!(ev);
}
Err(err) => eprintln!("{err}"),
}
}
}

Expand Down Expand Up @@ -493,11 +509,11 @@ async fn test_geth_disconnect() {
handle.add_peer(geth_peer_id, geth_socket);

match events.next().await {
Some(NetworkEvent::PeerAdded(peer_id)) => assert_eq!(peer_id, geth_peer_id),
Some(Ok(NetworkEvent::PeerAdded(peer_id))) => assert_eq!(peer_id, geth_peer_id),
_ => panic!("Expected a peer added event"),
}

if let Some(NetworkEvent::SessionEstablished { peer_id, .. }) = events.next().await {
if let Some(Ok(NetworkEvent::SessionEstablished { peer_id, .. })) = events.next().await {
assert_eq!(peer_id, geth_peer_id);
} else {
panic!("Expected a session established event");
Expand All @@ -507,7 +523,7 @@ async fn test_geth_disconnect() {
handle.disconnect_peer(geth_peer_id);

// wait for a disconnect from geth
if let Some(NetworkEvent::SessionClosed { peer_id, .. }) = events.next().await {
if let Some(Ok(NetworkEvent::SessionClosed { peer_id, .. })) = events.next().await {
assert_eq!(peer_id, geth_peer_id);
} else {
panic!("Expected a session closed event");
Expand Down
8 changes: 4 additions & 4 deletions crates/net/network/tests/it/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ async fn test_session_established_with_highest_version() {

while let Some(event) = events.next().await {
match event {
NetworkEvent::PeerAdded(peer_id) => {
Ok(NetworkEvent::PeerAdded(peer_id)) => {
assert_eq!(handle1.peer_id(), &peer_id);
}
NetworkEvent::SessionEstablished { peer_id, status, .. } => {
Ok(NetworkEvent::SessionEstablished { peer_id, status, .. }) => {
assert_eq!(handle1.peer_id(), &peer_id);
assert_eq!(status.version, EthVersion::Eth68 as u8);
}
Expand Down Expand Up @@ -66,10 +66,10 @@ async fn test_session_established_with_different_capability() {

while let Some(event) = events.next().await {
match event {
NetworkEvent::PeerAdded(peer_id) => {
Ok(NetworkEvent::PeerAdded(peer_id)) => {
assert_eq!(handle1.peer_id(), &peer_id);
}
NetworkEvent::SessionEstablished { peer_id, status, .. } => {
Ok(NetworkEvent::SessionEstablished { peer_id, status, .. }) => {
assert_eq!(handle1.peer_id(), &peer_id);
assert_eq!(status.version, EthVersion::Eth66 as u8);
}
Expand Down
Loading

0 comments on commit 16b61b9

Please sign in to comment.