From 0364dec91d7cc4998bac5e49519d2fcd2f8c700b Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Fri, 23 Feb 2024 02:42:36 +0100 Subject: [PATCH 1/7] Add poll metrics to network manager --- crates/net/network/src/manager.rs | 637 ++++++++++++--------- crates/net/network/src/metrics.rs | 40 +- crates/net/network/src/transactions/mod.rs | 15 +- 3 files changed, 399 insertions(+), 293 deletions(-) diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 22037faf8093..a38ce6c80601 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -18,6 +18,7 @@ use crate::{ config::NetworkConfig, discovery::Discovery, + duration_metered_exec, error::{NetworkError, ServiceKind}, eth_requests::IncomingEthRequest, import::{BlockImport, BlockImportOutcome, BlockValidation}, @@ -56,6 +57,7 @@ use std::{ Arc, }, task::{Context, Poll}, + time::{Duration, Instant}, }; use tokio::sync::mpsc::{self, error::TrySendError}; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -147,6 +149,19 @@ impl NetworkManager { pub fn secret_key(&self) -> SecretKey { self.swarm.sessions().secret_key() } + + #[inline] + fn update_poll_metrics(&self, start: Instant, poll_durations: NetworkManagerPollDurations) { + let metrics = &self.metrics; + + let NetworkManagerPollDurations { acc_network_handle, acc_swarm } = poll_durations; + + // update metrics for whole poll function + metrics.duration_poll_network_manager.set(start.elapsed()); + // update poll metrics for nested items + metrics.acc_duration_poll_network_handle.set(acc_network_handle.as_secs_f64()); + metrics.acc_duration_poll_swarm.set(acc_swarm.as_secs_f64()); + } } impl NetworkManager @@ -632,6 +647,9 @@ where type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let start = Instant::now(); + let mut poll_durations = NetworkManagerPollDurations::default(); + let this = self.get_mut(); // poll new block imports @@ -639,294 +657,349 @@ where this.on_block_import_result(outcome); } - // process incoming messages from a handle - loop { - match this.from_handle_rx.poll_next_unpin(cx) { - Poll::Pending => break, - Poll::Ready(None) => { - // This is only possible if the channel was deliberately closed since we always - // have an instance of `NetworkHandle` - error!("Network message channel closed."); - return Poll::Ready(()) + let acc = &mut poll_durations.acc_network_handle; + duration_metered_exec!( + { + // process incoming messages from a handle + loop { + match this.from_handle_rx.poll_next_unpin(cx) { + Poll::Pending => break, + Poll::Ready(None) => { + // This is only possible if the channel was deliberately closed since we + // always have an instance of + // `NetworkHandle` + error!("Network message channel closed."); + return Poll::Ready(()) + } + Poll::Ready(Some(msg)) => this.on_handle_message(msg), + }; } - Poll::Ready(Some(msg)) => this.on_handle_message(msg), - }; - } + }, + acc + ); - // This loop drives the entire state of network and does a lot of work. - // Under heavy load (many messages/events), data may arrive faster than it can be processed - // (incoming messages/requests -> events), and it is possible that more data has already - // arrived by the time an internal event is processed. Which could turn this loop into a - // busy loop. Without yielding back to the executor, it can starve other tasks waiting on - // that executor to execute them, or drive underlying resources To prevent this, we - // preemptively return control when the `budget` is exhausted. The value itself is - // chosen somewhat arbitrarily, it is high enough so the swarm can make meaningful progress - // but low enough that this loop does not starve other tasks for too long. - // If the budget is exhausted we manually yield back control to the (coop) scheduler. This - // manual yield point should prevent situations where polling appears to be frozen. See also - // And tokio's docs on cooperative scheduling - // - // Testing has shown that this loop naturally reaches the pending state within 1-5 - // iterations in << 100µs in most cases. On average it requires ~50µs, which is inside - // the range of what's recommended as rule of thumb. - // - let mut budget = 10; - - loop { - // advance the swarm - match this.swarm.poll_next_unpin(cx) { - Poll::Pending | Poll::Ready(None) => break, - Poll::Ready(Some(event)) => { - // handle event - match event { - SwarmEvent::ValidMessage { peer_id, message } => { - this.on_peer_message(peer_id, message) - } - SwarmEvent::InvalidCapabilityMessage { peer_id, capabilities, message } => { - this.on_invalid_message(peer_id, capabilities, message); - this.metrics.invalid_messages_received.increment(1); - } - SwarmEvent::TcpListenerClosed { remote_addr } => { - trace!(target: "net", ?remote_addr, "TCP listener closed."); - } - SwarmEvent::TcpListenerError(err) => { - trace!(target: "net", ?err, "TCP connection error."); - } - SwarmEvent::IncomingTcpConnection { remote_addr, session_id } => { - trace!(target: "net", ?session_id, ?remote_addr, "Incoming connection"); - this.metrics.total_incoming_connections.increment(1); - this.metrics - .incoming_connections - .set(this.swarm.state().peers().num_inbound_connections() as f64); - } - SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id } => { - trace!(target: "net", ?remote_addr, ?peer_id, "Starting outbound connection."); - this.metrics.total_outgoing_connections.increment(1); - this.metrics - .outgoing_connections - .set(this.swarm.state().peers().num_outbound_connections() as f64); - } - SwarmEvent::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - version, - messages, - status, - direction, - } => { - let total_active = - this.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1; - this.metrics.connected_peers.set(total_active as f64); - trace!( - target: "net", - ?remote_addr, - %client_version, - ?peer_id, - ?total_active, - kind=%direction, - peer_enode=%NodeRecord::new(remote_addr, peer_id), - "Session established" - ); - - if direction.is_incoming() { - this.swarm - .state_mut() - .peers_mut() - .on_incoming_session_established(peer_id, remote_addr); - } - this.event_listeners.notify(NetworkEvent::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - version, - status, - messages, - }); - } - SwarmEvent::PeerAdded(peer_id) => { - trace!(target: "net", ?peer_id, "Peer added"); - this.event_listeners.notify(NetworkEvent::PeerAdded(peer_id)); - this.metrics - .tracked_peers - .set(this.swarm.state().peers().num_known_peers() as f64); - } - SwarmEvent::PeerRemoved(peer_id) => { - trace!(target: "net", ?peer_id, "Peer dropped"); - this.event_listeners.notify(NetworkEvent::PeerRemoved(peer_id)); - this.metrics - .tracked_peers - .set(this.swarm.state().peers().num_known_peers() as f64); - } - SwarmEvent::SessionClosed { peer_id, remote_addr, error } => { - let total_active = - this.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1; - this.metrics.connected_peers.set(total_active as f64); - trace!( - target: "net", - ?remote_addr, - ?peer_id, - ?total_active, - ?error, - "Session disconnected" - ); - - let mut reason = None; - if let Some(ref err) = error { - // If the connection was closed due to an error, we report the peer - this.swarm.state_mut().peers_mut().on_active_session_dropped( - &remote_addr, - &peer_id, - err, - ); - reason = err.as_disconnected(); - } else { - // Gracefully disconnected - this.swarm - .state_mut() - .peers_mut() - .on_active_session_gracefully_closed(peer_id); - } - this.metrics.closed_sessions.increment(1); - // This can either be an incoming or outgoing connection which was - // closed. So we update both metrics - this.metrics - .incoming_connections - .set(this.swarm.state().peers().num_inbound_connections() as f64); - this.metrics - .outgoing_connections - .set(this.swarm.state().peers().num_outbound_connections() as f64); - if let Some(reason) = reason { - this.disconnect_metrics.increment(reason); - } - this.metrics.backed_off_peers.set( - this.swarm.state().peers().num_backed_off_peers().saturating_sub(1) - as f64, - ); - this.event_listeners - .notify(NetworkEvent::SessionClosed { peer_id, reason }); - } - SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => { - trace!( - target: "net", - ?remote_addr, - ?error, - "Incoming pending session failed" - ); - - if let Some(ref err) = error { - this.swarm - .state_mut() - .peers_mut() - .on_incoming_pending_session_dropped(remote_addr, err); - this.metrics.pending_session_failures.increment(1); - if let Some(reason) = err.as_disconnected() { - this.disconnect_metrics.increment(reason); + let acc = &mut poll_durations.acc_swarm; + duration_metered_exec!( + { + // This loop drives the entire state of network and does a lot of work. + // Under heavy load (many messages/events), data may arrive faster than it can be + // processed (incoming messages/requests -> events), and it is + // possible that more data has already arrived by the time an + // internal event is processed. Which could turn this loop into a + // busy loop. Without yielding back to the executor, it can starve other tasks + // waiting on that executor to execute them, or drive underlying + // resources To prevent this, we preemptively return control when + // the `budget` is exhausted. The value itself is chosen somewhat + // arbitrarily, it is high enough so the swarm can make meaningful progress + // but low enough that this loop does not starve other tasks for too long. + // If the budget is exhausted we manually yield back control to the (coop) + // scheduler. This manual yield point should prevent situations where polling appears to be frozen. See also + // And tokio's docs on cooperative scheduling + // + // Testing has shown that this loop naturally reaches the pending state within 1-5 + // iterations in << 100µs in most cases. On average it requires ~50µs, which is + // inside the range of what's recommended as rule of thumb. + // + let mut budget = 10; + + loop { + // advance the swarm + match this.swarm.poll_next_unpin(cx) { + Poll::Pending | Poll::Ready(None) => break, + Poll::Ready(Some(event)) => { + // handle event + match event { + SwarmEvent::ValidMessage { peer_id, message } => { + this.on_peer_message(peer_id, message) } - } else { - this.swarm - .state_mut() - .peers_mut() - .on_incoming_pending_session_gracefully_closed(); - } - this.metrics.closed_sessions.increment(1); - this.metrics - .incoming_connections - .set(this.swarm.state().peers().num_inbound_connections() as f64); - this.metrics.backed_off_peers.set( - this.swarm.state().peers().num_backed_off_peers().saturating_sub(1) - as f64, - ); - } - SwarmEvent::OutgoingPendingSessionClosed { - remote_addr, - peer_id, - error, - } => { - trace!( - target: "net", - ?remote_addr, - ?peer_id, - ?error, - "Outgoing pending session failed" - ); - - if let Some(ref err) = error { - this.swarm.state_mut().peers_mut().on_pending_session_dropped( - &remote_addr, - &peer_id, - err, - ); - this.metrics.pending_session_failures.increment(1); - if let Some(reason) = err.as_disconnected() { - this.disconnect_metrics.increment(reason); + SwarmEvent::InvalidCapabilityMessage { + peer_id, + capabilities, + message, + } => { + this.on_invalid_message(peer_id, capabilities, message); + this.metrics.invalid_messages_received.increment(1); + } + SwarmEvent::TcpListenerClosed { remote_addr } => { + trace!(target: "net", ?remote_addr, "TCP listener closed."); + } + SwarmEvent::TcpListenerError(err) => { + trace!(target: "net", ?err, "TCP connection error."); + } + SwarmEvent::IncomingTcpConnection { remote_addr, session_id } => { + trace!(target: "net", ?session_id, ?remote_addr, "Incoming connection"); + this.metrics.total_incoming_connections.increment(1); + this.metrics + .incoming_connections + .set(this.swarm.state().peers().num_inbound_connections() + as f64); + } + SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id } => { + trace!(target: "net", ?remote_addr, ?peer_id, "Starting outbound connection."); + this.metrics.total_outgoing_connections.increment(1); + this.metrics + .outgoing_connections + .set(this.swarm.state().peers().num_outbound_connections() + as f64); + } + SwarmEvent::SessionEstablished { + peer_id, + remote_addr, + client_version, + capabilities, + version, + messages, + status, + direction, + } => { + let total_active = + this.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1; + this.metrics.connected_peers.set(total_active as f64); + trace!( + target: "net", + ?remote_addr, + %client_version, + ?peer_id, + ?total_active, + kind=%direction, + peer_enode=%NodeRecord::new(remote_addr, peer_id), + "Session established" + ); + + if direction.is_incoming() { + this.swarm + .state_mut() + .peers_mut() + .on_incoming_session_established(peer_id, remote_addr); + } + this.event_listeners.notify(NetworkEvent::SessionEstablished { + peer_id, + remote_addr, + client_version, + capabilities, + version, + status, + messages, + }); + } + SwarmEvent::PeerAdded(peer_id) => { + trace!(target: "net", ?peer_id, "Peer added"); + this.event_listeners.notify(NetworkEvent::PeerAdded(peer_id)); + this.metrics + .tracked_peers + .set(this.swarm.state().peers().num_known_peers() as f64); + } + SwarmEvent::PeerRemoved(peer_id) => { + trace!(target: "net", ?peer_id, "Peer dropped"); + this.event_listeners.notify(NetworkEvent::PeerRemoved(peer_id)); + this.metrics + .tracked_peers + .set(this.swarm.state().peers().num_known_peers() as f64); + } + SwarmEvent::SessionClosed { peer_id, remote_addr, error } => { + let total_active = + this.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1; + this.metrics.connected_peers.set(total_active as f64); + trace!( + target: "net", + ?remote_addr, + ?peer_id, + ?total_active, + ?error, + "Session disconnected" + ); + + let mut reason = None; + if let Some(ref err) = error { + // If the connection was closed due to an error, we report + // the peer + this.swarm + .state_mut() + .peers_mut() + .on_active_session_dropped(&remote_addr, &peer_id, err); + reason = err.as_disconnected(); + } else { + // Gracefully disconnected + this.swarm + .state_mut() + .peers_mut() + .on_active_session_gracefully_closed(peer_id); + } + this.metrics.closed_sessions.increment(1); + // This can either be an incoming or outgoing connection which + // was closed. So we update + // both metrics + this.metrics + .incoming_connections + .set(this.swarm.state().peers().num_inbound_connections() + as f64); + this.metrics + .outgoing_connections + .set(this.swarm.state().peers().num_outbound_connections() + as f64); + if let Some(reason) = reason { + this.disconnect_metrics.increment(reason); + } + this.metrics.backed_off_peers.set( + this.swarm + .state() + .peers() + .num_backed_off_peers() + .saturating_sub(1) + as f64, + ); + this.event_listeners + .notify(NetworkEvent::SessionClosed { peer_id, reason }); + } + SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => { + trace!( + target: "net", + ?remote_addr, + ?error, + "Incoming pending session failed" + ); + + if let Some(ref err) = error { + this.swarm + .state_mut() + .peers_mut() + .on_incoming_pending_session_dropped(remote_addr, err); + this.metrics.pending_session_failures.increment(1); + if let Some(reason) = err.as_disconnected() { + this.disconnect_metrics.increment(reason); + } + } else { + this.swarm + .state_mut() + .peers_mut() + .on_incoming_pending_session_gracefully_closed(); + } + this.metrics.closed_sessions.increment(1); + this.metrics + .incoming_connections + .set(this.swarm.state().peers().num_inbound_connections() + as f64); + this.metrics.backed_off_peers.set( + this.swarm + .state() + .peers() + .num_backed_off_peers() + .saturating_sub(1) + as f64, + ); + } + SwarmEvent::OutgoingPendingSessionClosed { + remote_addr, + peer_id, + error, + } => { + trace!( + target: "net", + ?remote_addr, + ?peer_id, + ?error, + "Outgoing pending session failed" + ); + + if let Some(ref err) = error { + this.swarm + .state_mut() + .peers_mut() + .on_pending_session_dropped( + &remote_addr, + &peer_id, + err, + ); + this.metrics.pending_session_failures.increment(1); + if let Some(reason) = err.as_disconnected() { + this.disconnect_metrics.increment(reason); + } + } else { + this.swarm + .state_mut() + .peers_mut() + .on_pending_session_gracefully_closed(&peer_id); + } + this.metrics.closed_sessions.increment(1); + this.metrics + .outgoing_connections + .set(this.swarm.state().peers().num_outbound_connections() + as f64); + this.metrics.backed_off_peers.set( + this.swarm + .state() + .peers() + .num_backed_off_peers() + .saturating_sub(1) + as f64, + ); + } + SwarmEvent::OutgoingConnectionError { + remote_addr, + peer_id, + error, + } => { + trace!( + target: "net", + ?remote_addr, + ?peer_id, + ?error, + "Outgoing connection error" + ); + + this.swarm + .state_mut() + .peers_mut() + .on_outgoing_connection_failure( + &remote_addr, + &peer_id, + &error, + ); + + this.metrics + .outgoing_connections + .set(this.swarm.state().peers().num_outbound_connections() + as f64); + this.metrics.backed_off_peers.set( + this.swarm + .state() + .peers() + .num_backed_off_peers() + .saturating_sub(1) + as f64, + ); + } + SwarmEvent::BadMessage { peer_id } => { + this.swarm.state_mut().peers_mut().apply_reputation_change( + &peer_id, + ReputationChangeKind::BadMessage, + ); + this.metrics.invalid_messages_received.increment(1); + } + SwarmEvent::ProtocolBreach { peer_id } => { + this.swarm.state_mut().peers_mut().apply_reputation_change( + &peer_id, + ReputationChangeKind::BadProtocol, + ); } - } else { - this.swarm - .state_mut() - .peers_mut() - .on_pending_session_gracefully_closed(&peer_id); } - this.metrics.closed_sessions.increment(1); - this.metrics - .outgoing_connections - .set(this.swarm.state().peers().num_outbound_connections() as f64); - this.metrics.backed_off_peers.set( - this.swarm.state().peers().num_backed_off_peers().saturating_sub(1) - as f64, - ); - } - SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => { - trace!( - target: "net", - ?remote_addr, - ?peer_id, - ?error, - "Outgoing connection error" - ); - - this.swarm.state_mut().peers_mut().on_outgoing_connection_failure( - &remote_addr, - &peer_id, - &error, - ); - - this.metrics - .outgoing_connections - .set(this.swarm.state().peers().num_outbound_connections() as f64); - this.metrics.backed_off_peers.set( - this.swarm.state().peers().num_backed_off_peers().saturating_sub(1) - as f64, - ); - } - SwarmEvent::BadMessage { peer_id } => { - this.swarm.state_mut().peers_mut().apply_reputation_change( - &peer_id, - ReputationChangeKind::BadMessage, - ); - this.metrics.invalid_messages_received.increment(1); - } - SwarmEvent::ProtocolBreach { peer_id } => { - this.swarm.state_mut().peers_mut().apply_reputation_change( - &peer_id, - ReputationChangeKind::BadProtocol, - ); } } + + // ensure we still have enough budget for another iteration + budget -= 1; + if budget == 0 { + trace!(target: "net", budget=10, "exhausted network manager budget"); + // make sure we're woken up again + cx.waker().wake_by_ref(); + break + } } - } + }, + acc + ); - // ensure we still have enough budget for another iteration - budget -= 1; - if budget == 0 { - trace!(target: "net", budget=10, "exhausted network manager budget"); - // make sure we're woken up again - cx.waker().wake_by_ref(); - break - } - } + this.update_poll_metrics(start, poll_durations); Poll::Pending } @@ -972,3 +1045,9 @@ pub enum NetworkEvent { pub enum DiscoveredEvent { EventQueued { peer_id: PeerId, socket_addr: SocketAddr, fork_id: Option }, } + +#[derive(Debug, Default)] +struct NetworkManagerPollDurations { + acc_network_handle: Duration, + acc_swarm: Duration, +} diff --git a/crates/net/network/src/metrics.rs b/crates/net/network/src/metrics.rs index fe3936d894fd..edd6ff615cbe 100644 --- a/crates/net/network/src/metrics.rs +++ b/crates/net/network/src/metrics.rs @@ -43,6 +43,31 @@ pub struct NetworkMetrics { /// Number of Eth Requests dropped due to channel being at full capacity pub(crate) total_dropped_eth_requests_at_full_capacity: Counter, + + /* ================ POLL DURATION ================ */ + + /* -- Total poll duration of `NetworksManager` future -- */ + /// Duration in seconds of call to + /// [`NetworkManager`](crate::NetworkManager)'s poll function. + /// + /// True duration of this call, should be sum of the accumulated durations of calling nested + // items. + pub(crate) duration_poll_network_manager: Gauge, + + /* -- Poll duration of items nested in `NetworkManager` future -- */ + /// Accumulated time spent streaming messages sent over the + /// [`NetworkHandle`](crate::NetworkHandle), which can be cloned and shared via + /// [`NetworkManager::handle`](crate::NetworkManager::handle), in one call to poll the + /// [`NetworkManager`](crate::NetworkManager) future. + /// + /// Duration in seconds. + // todo: find out how many components hold the network handle. + pub(crate) acc_duration_poll_network_handle: Gauge, + /// Accumulated time spent polling [`Swarm`](crate::swarm::Swarm), in one call to poll the + /// [`NetworkManager`](crate::NetworkManager) future. + /// + /// Duration in seconds. + pub(crate) acc_duration_poll_swarm: Gauge, } /// Metrics for SessionManager @@ -117,7 +142,7 @@ pub struct TransactionsManagerMetrics { /// [`TransactionsManager`](crate::transactions::TransactionsManager)'s poll function. /// /// Updating metrics could take time, so the true duration of this call could - /// be longer than the sum of the accumulated durations of polling nested streams. + /// be longer than the sum of the accumulated durations of polling nested items. pub(crate) duration_poll_tx_manager: Gauge, /* -- Poll duration of items nested in `TransactionsManager` future -- */ @@ -244,3 +269,16 @@ pub struct EthRequestHandlerMetrics { /// Number of received bodies requests pub(crate) received_bodies_requests: Counter, } + +/// Measures the duration of executing the given code block. The duration is added to the given +/// accumulator value passed as a mutable reference. +#[macro_export] +macro_rules! duration_metered_exec { + ($code:block, $acc:ident) => { + let start = Instant::now(); + + $code; + + *$acc += start.elapsed(); + }; +} diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index 65727c23577f..5c091cd07cee 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -29,6 +29,7 @@ use crate::{ cache::LruCache, + duration_metered_exec, manager::NetworkEvent, message::{PeerRequest, PeerRequestSender}, metrics::{TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE}, @@ -333,7 +334,7 @@ where // update metrics for whole poll function metrics.duration_poll_tx_manager.set(start.elapsed()); - // update poll metrics for nested streams + // update metrics for nested items metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64()); metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64()); metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64()); @@ -1110,18 +1111,6 @@ where } } -/// Measures the duration of executing the given code block. The duration is added to the given -/// accumulator value passed as a mutable reference. -macro_rules! duration_metered_exec { - ($code:block, $acc:ident) => { - let start = Instant::now(); - - $code; - - *$acc += start.elapsed(); - }; -} - #[derive(Debug, Default)] struct TxManagerPollDurations { acc_network_events: Duration, From 2f3ba1797070e3ded6b4ec659fbe6f6f9077ebff Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Fri, 23 Feb 2024 03:03:59 +0100 Subject: [PATCH 2/7] Update comment --- crates/net/network/src/manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index a38ce6c80601..3a1a7ce92cbc 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -652,7 +652,7 @@ where let this = self.get_mut(); - // poll new block imports + // poll new block imports (dummy) while let Poll::Ready(outcome) = this.block_import.poll(cx) { this.on_block_import_result(outcome); } From fff6946582ccf29e907bb2a42ebd5981d18e23ba Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Fri, 23 Feb 2024 03:17:15 +0100 Subject: [PATCH 3/7] Move swarm event processing to own method, improves readability --- crates/net/network/src/manager.rs | 528 ++++++++++++++---------------- 1 file changed, 248 insertions(+), 280 deletions(-) diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 3a1a7ce92cbc..f97e19a9d159 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -610,6 +610,249 @@ where NetworkHandleMessage::AddRlpxSubProtocol(proto) => self.add_rlpx_sub_protocol(proto), } } + + fn on_swarm_event(&mut self, event: SwarmEvent) { + // handle event + match event { + SwarmEvent::ValidMessage { peer_id, message } => self.on_peer_message(peer_id, message), + SwarmEvent::InvalidCapabilityMessage { peer_id, capabilities, message } => { + self.on_invalid_message(peer_id, capabilities, message); + self.metrics.invalid_messages_received.increment(1); + } + SwarmEvent::TcpListenerClosed { remote_addr } => { + trace!(target: "net", ?remote_addr, "TCP listener closed."); + } + SwarmEvent::TcpListenerError(err) => { + trace!(target: "net", ?err, "TCP connection error."); + } + SwarmEvent::IncomingTcpConnection { remote_addr, session_id } => { + trace!(target: "net", ?session_id, ?remote_addr, "Incoming connection"); + self.metrics.total_incoming_connections.increment(1); + self.metrics + .incoming_connections + .set(self.swarm.state().peers().num_inbound_connections() as f64); + } + SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id } => { + trace!(target: "net", ?remote_addr, ?peer_id, "Starting outbound connection."); + self.metrics.total_outgoing_connections.increment(1); + self.metrics + .outgoing_connections + .set(self.swarm.state().peers().num_outbound_connections() as f64); + } + SwarmEvent::SessionEstablished { + peer_id, + remote_addr, + client_version, + capabilities, + version, + messages, + status, + direction, + } => { + let total_active = self.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1; + self.metrics.connected_peers.set(total_active as f64); + trace!( + target: "net", + ?remote_addr, + %client_version, + ?peer_id, + ?total_active, + kind=%direction, + peer_enode=%NodeRecord::new(remote_addr, peer_id), + "Session established" + ); + + if direction.is_incoming() { + self.swarm + .state_mut() + .peers_mut() + .on_incoming_session_established(peer_id, remote_addr); + } + self.event_listeners.notify(NetworkEvent::SessionEstablished { + peer_id, + remote_addr, + client_version, + capabilities, + version, + status, + messages, + }); + } + SwarmEvent::PeerAdded(peer_id) => { + trace!(target: "net", ?peer_id, "Peer added"); + self.event_listeners.notify(NetworkEvent::PeerAdded(peer_id)); + self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64); + } + SwarmEvent::PeerRemoved(peer_id) => { + trace!(target: "net", ?peer_id, "Peer dropped"); + self.event_listeners.notify(NetworkEvent::PeerRemoved(peer_id)); + self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64); + } + SwarmEvent::SessionClosed { peer_id, remote_addr, error } => { + let total_active = self.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1; + self.metrics.connected_peers.set(total_active as f64); + trace!( + target: "net", + ?remote_addr, + ?peer_id, + ?total_active, + ?error, + "Session disconnected" + ); + + let mut reason = None; + if let Some(ref err) = error { + // If the connection was closed due to an error, we report + // the peer + self.swarm.state_mut().peers_mut().on_active_session_dropped( + &remote_addr, + &peer_id, + err, + ); + reason = err.as_disconnected(); + } else { + // Gracefully disconnected + self.swarm.state_mut().peers_mut().on_active_session_gracefully_closed(peer_id); + } + self.metrics.closed_sessions.increment(1); + // This can either be an incoming or outgoing connection which + // was closed. So we update + // both metrics + self.metrics + .incoming_connections + .set(self.swarm.state().peers().num_inbound_connections() as f64); + self.metrics + .outgoing_connections + .set(self.swarm.state().peers().num_outbound_connections() as f64); + if let Some(reason) = reason { + self.disconnect_metrics.increment(reason); + } + self.metrics.backed_off_peers.set( + self.swarm + .state() + .peers() + .num_backed_off_peers() + .saturating_sub(1) + as f64, + ); + self.event_listeners.notify(NetworkEvent::SessionClosed { peer_id, reason }); + } + SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => { + trace!( + target: "net", + ?remote_addr, + ?error, + "Incoming pending session failed" + ); + + if let Some(ref err) = error { + self.swarm + .state_mut() + .peers_mut() + .on_incoming_pending_session_dropped(remote_addr, err); + self.metrics.pending_session_failures.increment(1); + if let Some(reason) = err.as_disconnected() { + self.disconnect_metrics.increment(reason); + } + } else { + self.swarm + .state_mut() + .peers_mut() + .on_incoming_pending_session_gracefully_closed(); + } + self.metrics.closed_sessions.increment(1); + self.metrics + .incoming_connections + .set(self.swarm.state().peers().num_inbound_connections() as f64); + self.metrics.backed_off_peers.set( + self.swarm + .state() + .peers() + .num_backed_off_peers() + .saturating_sub(1) + as f64, + ); + } + SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => { + trace!( + target: "net", + ?remote_addr, + ?peer_id, + ?error, + "Outgoing pending session failed" + ); + + if let Some(ref err) = error { + self.swarm.state_mut().peers_mut().on_pending_session_dropped( + &remote_addr, + &peer_id, + err, + ); + self.metrics.pending_session_failures.increment(1); + if let Some(reason) = err.as_disconnected() { + self.disconnect_metrics.increment(reason); + } + } else { + self.swarm + .state_mut() + .peers_mut() + .on_pending_session_gracefully_closed(&peer_id); + } + self.metrics.closed_sessions.increment(1); + self.metrics + .outgoing_connections + .set(self.swarm.state().peers().num_outbound_connections() as f64); + self.metrics.backed_off_peers.set( + self.swarm + .state() + .peers() + .num_backed_off_peers() + .saturating_sub(1) + as f64, + ); + } + SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => { + trace!( + target: "net", + ?remote_addr, + ?peer_id, + ?error, + "Outgoing connection error" + ); + + self.swarm.state_mut().peers_mut().on_outgoing_connection_failure( + &remote_addr, + &peer_id, + &error, + ); + + self.metrics + .outgoing_connections + .set(self.swarm.state().peers().num_outbound_connections() as f64); + self.metrics.backed_off_peers.set( + self.swarm + .state() + .peers() + .num_backed_off_peers() + .saturating_sub(1) + as f64, + ); + } + SwarmEvent::BadMessage { peer_id } => { + self.swarm + .state_mut() + .peers_mut() + .apply_reputation_change(&peer_id, ReputationChangeKind::BadMessage); + self.metrics.invalid_messages_received.increment(1); + } + SwarmEvent::ProtocolBreach { peer_id } => { + self.swarm + .state_mut() + .peers_mut() + .apply_reputation_change(&peer_id, ReputationChangeKind::BadProtocol); + } + } + } } impl NetworkManager @@ -693,8 +936,10 @@ where // arbitrarily, it is high enough so the swarm can make meaningful progress // but low enough that this loop does not starve other tasks for too long. // If the budget is exhausted we manually yield back control to the (coop) - // scheduler. This manual yield point should prevent situations where polling appears to be frozen. See also - // And tokio's docs on cooperative scheduling + // scheduler. This manual yield point should prevent situations where polling + // appears to be frozen. See also + // And tokio's docs on cooperative scheduling + // // // Testing has shown that this loop naturally reaches the pending state within 1-5 // iterations in << 100µs in most cases. On average it requires ~50µs, which is @@ -706,284 +951,7 @@ where // advance the swarm match this.swarm.poll_next_unpin(cx) { Poll::Pending | Poll::Ready(None) => break, - Poll::Ready(Some(event)) => { - // handle event - match event { - SwarmEvent::ValidMessage { peer_id, message } => { - this.on_peer_message(peer_id, message) - } - SwarmEvent::InvalidCapabilityMessage { - peer_id, - capabilities, - message, - } => { - this.on_invalid_message(peer_id, capabilities, message); - this.metrics.invalid_messages_received.increment(1); - } - SwarmEvent::TcpListenerClosed { remote_addr } => { - trace!(target: "net", ?remote_addr, "TCP listener closed."); - } - SwarmEvent::TcpListenerError(err) => { - trace!(target: "net", ?err, "TCP connection error."); - } - SwarmEvent::IncomingTcpConnection { remote_addr, session_id } => { - trace!(target: "net", ?session_id, ?remote_addr, "Incoming connection"); - this.metrics.total_incoming_connections.increment(1); - this.metrics - .incoming_connections - .set(this.swarm.state().peers().num_inbound_connections() - as f64); - } - SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id } => { - trace!(target: "net", ?remote_addr, ?peer_id, "Starting outbound connection."); - this.metrics.total_outgoing_connections.increment(1); - this.metrics - .outgoing_connections - .set(this.swarm.state().peers().num_outbound_connections() - as f64); - } - SwarmEvent::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - version, - messages, - status, - direction, - } => { - let total_active = - this.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1; - this.metrics.connected_peers.set(total_active as f64); - trace!( - target: "net", - ?remote_addr, - %client_version, - ?peer_id, - ?total_active, - kind=%direction, - peer_enode=%NodeRecord::new(remote_addr, peer_id), - "Session established" - ); - - if direction.is_incoming() { - this.swarm - .state_mut() - .peers_mut() - .on_incoming_session_established(peer_id, remote_addr); - } - this.event_listeners.notify(NetworkEvent::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - version, - status, - messages, - }); - } - SwarmEvent::PeerAdded(peer_id) => { - trace!(target: "net", ?peer_id, "Peer added"); - this.event_listeners.notify(NetworkEvent::PeerAdded(peer_id)); - this.metrics - .tracked_peers - .set(this.swarm.state().peers().num_known_peers() as f64); - } - SwarmEvent::PeerRemoved(peer_id) => { - trace!(target: "net", ?peer_id, "Peer dropped"); - this.event_listeners.notify(NetworkEvent::PeerRemoved(peer_id)); - this.metrics - .tracked_peers - .set(this.swarm.state().peers().num_known_peers() as f64); - } - SwarmEvent::SessionClosed { peer_id, remote_addr, error } => { - let total_active = - this.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1; - this.metrics.connected_peers.set(total_active as f64); - trace!( - target: "net", - ?remote_addr, - ?peer_id, - ?total_active, - ?error, - "Session disconnected" - ); - - let mut reason = None; - if let Some(ref err) = error { - // If the connection was closed due to an error, we report - // the peer - this.swarm - .state_mut() - .peers_mut() - .on_active_session_dropped(&remote_addr, &peer_id, err); - reason = err.as_disconnected(); - } else { - // Gracefully disconnected - this.swarm - .state_mut() - .peers_mut() - .on_active_session_gracefully_closed(peer_id); - } - this.metrics.closed_sessions.increment(1); - // This can either be an incoming or outgoing connection which - // was closed. So we update - // both metrics - this.metrics - .incoming_connections - .set(this.swarm.state().peers().num_inbound_connections() - as f64); - this.metrics - .outgoing_connections - .set(this.swarm.state().peers().num_outbound_connections() - as f64); - if let Some(reason) = reason { - this.disconnect_metrics.increment(reason); - } - this.metrics.backed_off_peers.set( - this.swarm - .state() - .peers() - .num_backed_off_peers() - .saturating_sub(1) - as f64, - ); - this.event_listeners - .notify(NetworkEvent::SessionClosed { peer_id, reason }); - } - SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => { - trace!( - target: "net", - ?remote_addr, - ?error, - "Incoming pending session failed" - ); - - if let Some(ref err) = error { - this.swarm - .state_mut() - .peers_mut() - .on_incoming_pending_session_dropped(remote_addr, err); - this.metrics.pending_session_failures.increment(1); - if let Some(reason) = err.as_disconnected() { - this.disconnect_metrics.increment(reason); - } - } else { - this.swarm - .state_mut() - .peers_mut() - .on_incoming_pending_session_gracefully_closed(); - } - this.metrics.closed_sessions.increment(1); - this.metrics - .incoming_connections - .set(this.swarm.state().peers().num_inbound_connections() - as f64); - this.metrics.backed_off_peers.set( - this.swarm - .state() - .peers() - .num_backed_off_peers() - .saturating_sub(1) - as f64, - ); - } - SwarmEvent::OutgoingPendingSessionClosed { - remote_addr, - peer_id, - error, - } => { - trace!( - target: "net", - ?remote_addr, - ?peer_id, - ?error, - "Outgoing pending session failed" - ); - - if let Some(ref err) = error { - this.swarm - .state_mut() - .peers_mut() - .on_pending_session_dropped( - &remote_addr, - &peer_id, - err, - ); - this.metrics.pending_session_failures.increment(1); - if let Some(reason) = err.as_disconnected() { - this.disconnect_metrics.increment(reason); - } - } else { - this.swarm - .state_mut() - .peers_mut() - .on_pending_session_gracefully_closed(&peer_id); - } - this.metrics.closed_sessions.increment(1); - this.metrics - .outgoing_connections - .set(this.swarm.state().peers().num_outbound_connections() - as f64); - this.metrics.backed_off_peers.set( - this.swarm - .state() - .peers() - .num_backed_off_peers() - .saturating_sub(1) - as f64, - ); - } - SwarmEvent::OutgoingConnectionError { - remote_addr, - peer_id, - error, - } => { - trace!( - target: "net", - ?remote_addr, - ?peer_id, - ?error, - "Outgoing connection error" - ); - - this.swarm - .state_mut() - .peers_mut() - .on_outgoing_connection_failure( - &remote_addr, - &peer_id, - &error, - ); - - this.metrics - .outgoing_connections - .set(this.swarm.state().peers().num_outbound_connections() - as f64); - this.metrics.backed_off_peers.set( - this.swarm - .state() - .peers() - .num_backed_off_peers() - .saturating_sub(1) - as f64, - ); - } - SwarmEvent::BadMessage { peer_id } => { - this.swarm.state_mut().peers_mut().apply_reputation_change( - &peer_id, - ReputationChangeKind::BadMessage, - ); - this.metrics.invalid_messages_received.increment(1); - } - SwarmEvent::ProtocolBreach { peer_id } => { - this.swarm.state_mut().peers_mut().apply_reputation_change( - &peer_id, - ReputationChangeKind::BadProtocol, - ); - } - } - } + Poll::Ready(Some(event)) => this.on_swarm_event(event), } // ensure we still have enough budget for another iteration From 5c5e4827490f6746631b2e4683937a6850fdf3ff Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Fri, 23 Feb 2024 05:43:36 +0100 Subject: [PATCH 4/7] Remove 'accumulated' from metrics for nested items, network manager not a loop --- crates/net/network/src/manager.rs | 4 ++-- crates/net/network/src/metrics.rs | 13 ++++++------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index f97e19a9d159..257664389c09 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -159,8 +159,8 @@ impl NetworkManager { // update metrics for whole poll function metrics.duration_poll_network_manager.set(start.elapsed()); // update poll metrics for nested items - metrics.acc_duration_poll_network_handle.set(acc_network_handle.as_secs_f64()); - metrics.acc_duration_poll_swarm.set(acc_swarm.as_secs_f64()); + metrics.duration_poll_network_handle.set(acc_network_handle.as_secs_f64()); + metrics.duration_poll_swarm.set(acc_swarm.as_secs_f64()); } } diff --git a/crates/net/network/src/metrics.rs b/crates/net/network/src/metrics.rs index edd6ff615cbe..8c581b4350e7 100644 --- a/crates/net/network/src/metrics.rs +++ b/crates/net/network/src/metrics.rs @@ -55,19 +55,18 @@ pub struct NetworkMetrics { pub(crate) duration_poll_network_manager: Gauge, /* -- Poll duration of items nested in `NetworkManager` future -- */ - /// Accumulated time spent streaming messages sent over the - /// [`NetworkHandle`](crate::NetworkHandle), which can be cloned and shared via - /// [`NetworkManager::handle`](crate::NetworkManager::handle), in one call to poll the - /// [`NetworkManager`](crate::NetworkManager) future. + /// Time spent streaming messages sent over the [`NetworkHandle`](crate::NetworkHandle), which + /// can be cloned and shared via [`NetworkManager::handle`](crate::NetworkManager::handle), in + /// one call to poll the [`NetworkManager`](crate::NetworkManager) future. /// /// Duration in seconds. // todo: find out how many components hold the network handle. - pub(crate) acc_duration_poll_network_handle: Gauge, - /// Accumulated time spent polling [`Swarm`](crate::swarm::Swarm), in one call to poll the + pub(crate) duration_poll_network_handle: Gauge, + /// Time spent polling [`Swarm`](crate::swarm::Swarm), in one call to poll the /// [`NetworkManager`](crate::NetworkManager) future. /// /// Duration in seconds. - pub(crate) acc_duration_poll_swarm: Gauge, + pub(crate) duration_poll_swarm: Gauge, } /// Metrics for SessionManager From 71b2e2789e912efc3054ac6241dbddabdb1b82ca Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Tue, 27 Feb 2024 02:12:56 +0100 Subject: [PATCH 5/7] Update comment --- crates/net/network/src/manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 257664389c09..21d867935de4 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -895,7 +895,7 @@ where let this = self.get_mut(); - // poll new block imports (dummy) + // poll new block imports (expected to be a noop for POS) while let Poll::Ready(outcome) = this.block_import.poll(cx) { this.on_block_import_result(outcome); } From 532d1c0318a815522d3380ab86828be1cef1a840 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Tue, 27 Feb 2024 02:14:24 +0100 Subject: [PATCH 6/7] Fix duration to secs as f64 --- crates/net/network/src/manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 21d867935de4..54097a54e8f1 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -157,7 +157,7 @@ impl NetworkManager { let NetworkManagerPollDurations { acc_network_handle, acc_swarm } = poll_durations; // update metrics for whole poll function - metrics.duration_poll_network_manager.set(start.elapsed()); + metrics.duration_poll_network_manager.set(start.elapsed().as_secs_f64()); // update poll metrics for nested items metrics.duration_poll_network_handle.set(acc_network_handle.as_secs_f64()); metrics.duration_poll_swarm.set(acc_swarm.as_secs_f64()); From ce8203a817922e4153e3e0773a790d58928602cb Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Thu, 29 Feb 2024 00:00:24 +0100 Subject: [PATCH 7/7] Address review --- crates/net/network/src/manager.rs | 118 ++++++++++++++---------------- 1 file changed, 54 insertions(+), 64 deletions(-) diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 54097a54e8f1..931c43a54f81 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -18,7 +18,6 @@ use crate::{ config::NetworkConfig, discovery::Discovery, - duration_metered_exec, error::{NetworkError, ServiceKind}, eth_requests::IncomingEthRequest, import::{BlockImport, BlockImportOutcome, BlockValidation}, @@ -900,72 +899,63 @@ where this.on_block_import_result(outcome); } - let acc = &mut poll_durations.acc_network_handle; - duration_metered_exec!( - { - // process incoming messages from a handle - loop { - match this.from_handle_rx.poll_next_unpin(cx) { - Poll::Pending => break, - Poll::Ready(None) => { - // This is only possible if the channel was deliberately closed since we - // always have an instance of - // `NetworkHandle` - error!("Network message channel closed."); - return Poll::Ready(()) - } - Poll::Ready(Some(msg)) => this.on_handle_message(msg), - }; + // process incoming messages from a handle + let start_network_handle = Instant::now(); + loop { + match this.from_handle_rx.poll_next_unpin(cx) { + Poll::Pending => break, + Poll::Ready(None) => { + // This is only possible if the channel was deliberately closed since we + // always have an instance of + // `NetworkHandle` + error!("Network message channel closed."); + return Poll::Ready(()) } - }, - acc - ); + Poll::Ready(Some(msg)) => this.on_handle_message(msg), + }; + } - let acc = &mut poll_durations.acc_swarm; - duration_metered_exec!( - { - // This loop drives the entire state of network and does a lot of work. - // Under heavy load (many messages/events), data may arrive faster than it can be - // processed (incoming messages/requests -> events), and it is - // possible that more data has already arrived by the time an - // internal event is processed. Which could turn this loop into a - // busy loop. Without yielding back to the executor, it can starve other tasks - // waiting on that executor to execute them, or drive underlying - // resources To prevent this, we preemptively return control when - // the `budget` is exhausted. The value itself is chosen somewhat - // arbitrarily, it is high enough so the swarm can make meaningful progress - // but low enough that this loop does not starve other tasks for too long. - // If the budget is exhausted we manually yield back control to the (coop) - // scheduler. This manual yield point should prevent situations where polling - // appears to be frozen. See also - // And tokio's docs on cooperative scheduling - // - // - // Testing has shown that this loop naturally reaches the pending state within 1-5 - // iterations in << 100µs in most cases. On average it requires ~50µs, which is - // inside the range of what's recommended as rule of thumb. - // - let mut budget = 10; - - loop { - // advance the swarm - match this.swarm.poll_next_unpin(cx) { - Poll::Pending | Poll::Ready(None) => break, - Poll::Ready(Some(event)) => this.on_swarm_event(event), - } + poll_durations.acc_network_handle = start_network_handle.elapsed(); + + // This loop drives the entire state of network and does a lot of work. Under heavy load + // (many messages/events), data may arrive faster than it can be processed (incoming + // messages/requests -> events), and it is possible that more data has already arrived by + // the time an internal event is processed. Which could turn this loop into a busy loop. + // Without yielding back to the executor, it can starve other tasks waiting on that + // executor to execute them, or drive underlying resources To prevent this, we + // preemptively return control when the `budget` is exhausted. The value itself is chosen + // somewhat arbitrarily, it is high enough so the swarm can make meaningful progress but + // low enough that this loop does not starve other tasks for too long. If the budget is + // exhausted we manually yield back control to the (coop) scheduler. This manual yield + // point should prevent situations where polling appears to be frozen. See also + // And tokio's docs on cooperative scheduling + // + // + // Testing has shown that this loop naturally reaches the pending state within 1-5 + // iterations in << 100µs in most cases. On average it requires ~50µs, which is inside the + // range of what's recommended as rule of thumb. + // + let mut budget = 10; + + loop { + // advance the swarm + match this.swarm.poll_next_unpin(cx) { + Poll::Pending | Poll::Ready(None) => break, + Poll::Ready(Some(event)) => this.on_swarm_event(event), + } + + // ensure we still have enough budget for another iteration + budget -= 1; + if budget == 0 { + trace!(target: "net", budget=10, "exhausted network manager budget"); + // make sure we're woken up again + cx.waker().wake_by_ref(); + break + } + } - // ensure we still have enough budget for another iteration - budget -= 1; - if budget == 0 { - trace!(target: "net", budget=10, "exhausted network manager budget"); - // make sure we're woken up again - cx.waker().wake_by_ref(); - break - } - } - }, - acc - ); + poll_durations.acc_swarm = + start_network_handle.elapsed() - poll_durations.acc_network_handle; this.update_poll_metrics(start, poll_durations);