Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Disconnect peers #1484

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion beacon_node/eth2_libp2p/src/behaviour/handler/delegate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl<TSpec: EthSpec> DelegatingHandler<TSpec> {
}

/// Gives access to identify's handler.
pub fn identify(&self) -> &IdentifyHandler {
pub fn _identify(&self) -> &IdentifyHandler {
&self.identify_handler
}
}
Expand Down
11 changes: 4 additions & 7 deletions beacon_node/eth2_libp2p/src/behaviour/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl<TSpec: EthSpec> ProtocolsHandler for BehaviourHandler<TSpec> {
fn inject_event(&mut self, event: Self::InEvent) {
match event {
BehaviourHandlerIn::Delegate(delegated_ev) => self.delegate.inject_event(delegated_ev),
/* Events comming from the behaviour */
/* Events coming from the behaviour */
BehaviourHandlerIn::Shutdown(last_message) => {
self.shutting_down = true;
self.delegate.rpc_mut().shutdown(last_message);
Expand Down Expand Up @@ -113,12 +113,9 @@ impl<TSpec: EthSpec> ProtocolsHandler for BehaviourHandler<TSpec> {
>,
> {
// Disconnect if the sub-handlers are ready.
if self.shutting_down {
let rpc_keep_alive = self.delegate.rpc().connection_keep_alive();
let identify_keep_alive = self.delegate.identify().connection_keep_alive();
if KeepAlive::No == rpc_keep_alive.max(identify_keep_alive) {
return Poll::Ready(ProtocolsHandlerEvent::Close(DelegateError::Disconnected));
}
// Currently we only respect the RPC handler.
if self.shutting_down && KeepAlive::No == self.delegate.rpc().connection_keep_alive() {
return Poll::Ready(ProtocolsHandlerEvent::Close(DelegateError::Disconnected));
}

match self.delegate.poll(cx) {
Expand Down
35 changes: 18 additions & 17 deletions beacon_node/eth2_libp2p/src/behaviour/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -694,25 +694,35 @@ impl<TSpec: EthSpec> NetworkBehaviour for Behaviour<TSpec> {
conn_id: &ConnectionId,
endpoint: &ConnectedPoint,
) {
// If the peer manager (and therefore the behaviour's) believe this peer connected, inform
// about the disconnection.
if self.network_globals.peers.read().is_connected(&peer_id) {
return;
}
delegate_to_behaviours!(self, inject_connection_closed, peer_id, conn_id, endpoint);
}

// This gets called once there are no more active connections.
fn inject_disconnected(&mut self, peer_id: &PeerId) {
// If the application/behaviour layers thinks this peer has connected inform it of the disconnect.
if self.network_globals.peers.read().is_connected(&peer_id) {
// Inform the application.
self.add_event(BehaviourEvent::PeerDisconnected(peer_id.clone()));
// Inform the behaviour.
delegate_to_behaviours!(self, inject_disconnected, peer_id);
}
// Inform the peer manager.
// NOTE: It may be the case that a rejected node, due to too many peers is disconnected
// here and the peer manager has no knowledge of its connection. We insert it here for
// reference so that peer manager can track this peer.
self.peer_manager.notify_disconnect(&peer_id);
// Inform the application.
self.add_event(BehaviourEvent::PeerDisconnected(peer_id.clone()));

// Update the prometheus metrics
metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT);
metrics::set_gauge(
&metrics::PEERS_CONNECTED,
self.network_globals.connected_peers() as i64,
);

// Inform the behaviour.
delegate_to_behaviours!(self, inject_disconnected, peer_id);
}

// This gets called every time a connection is established.
Expand Down Expand Up @@ -741,6 +751,7 @@ impl<TSpec: EthSpec> NetworkBehaviour for Behaviour<TSpec> {
};

if goodbye_reason.is_some() {
debug!(self.log, "Disconnecting newly connected peer"; "peer_id" => peer_id.to_string(), "reason" => goodbye_reason.as_ref().expect("Is some").to_string());
self.peers_to_dc
.push_back((peer_id.clone(), goodbye_reason));
return;
Expand Down Expand Up @@ -771,18 +782,8 @@ impl<TSpec: EthSpec> NetworkBehaviour for Behaviour<TSpec> {

// This gets called on the initial connection establishment.
fn inject_connected(&mut self, peer_id: &PeerId) {
// Drop any connection from a banned peer. The goodbye and disconnects are handled in
// `inject_connection_established()`, which gets called first.
// The same holds if we reached the peer limit and the connected peer has no future duty.
if self.peer_manager.is_banned(peer_id)
|| (self.peer_manager.peer_limit_reached()
&& self
.network_globals
.peers
.read()
.peer_info(peer_id)
.map_or(true, |i| !i.has_future_duty()))
{
// If the PeerManager has connected this peer, inform the behaviours
if !self.network_globals.peers.read().is_connected(&peer_id) {
return;
}

Expand Down
23 changes: 14 additions & 9 deletions beacon_node/eth2_libp2p/src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,15 +348,20 @@ where
// Check that we don't have outbound items pending for dialing, nor dialing, nor
// established. Also check that there are no established inbound substreams.
// Errors and events need to be reported back, so check those too.
let should_shutdown = if let HandlerState::ShuttingDown(_) = self.state {
self.dial_queue.is_empty()
&& self.outbound_substreams.is_empty()
&& self.inbound_substreams.is_empty()
&& self.pending_errors.is_empty()
&& self.events_out.is_empty()
&& self.dial_negotiated == 0
} else {
false
let should_shutdown = match self.state {
HandlerState::ShuttingDown(_) => {
self.dial_queue.is_empty()
&& self.outbound_substreams.is_empty()
&& self.inbound_substreams.is_empty()
&& self.pending_errors.is_empty()
&& self.events_out.is_empty()
&& self.dial_negotiated == 0
}
HandlerState::Deactivated => {
// Regardless of events, the timeout has expired. Force the disconnect.
true
}
_ => false,
};

match self.keep_alive {
Expand Down