Skip to content

Commit

Permalink
mitigate too manyn outgoing connections
Browse files Browse the repository at this point in the history
  • Loading branch information
divagant-martian committed Aug 11, 2020
1 parent 134676f commit 1197722
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 58 deletions.
4 changes: 2 additions & 2 deletions beacon_node/eth2_libp2p/src/behaviour/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -743,8 +743,8 @@ impl<TSpec: EthSpec> NetworkBehaviour for Behaviour<TSpec> {
.peer_info(peer_id)
.map_or(true, |i| !i.has_future_duty())
{
//If we are at our peer limit and we don't need the peer for a future validator
//duty, send goodbye with reason TooManyPeers
// If we are at our peer limit and we don't need the peer for a future validator
// duty, send goodbye with reason TooManyPeers
Some(GoodbyeReason::TooManyPeers)
} else {
None
Expand Down
79 changes: 23 additions & 56 deletions beacon_node/eth2_libp2p/src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,6 @@ where
/// Maximum number of concurrent outbound substreams being opened. Value is never modified.
max_dial_negotiated: u32,

/// Value to return from `connection_keep_alive`.
keep_alive: KeepAlive,

/// State of the handler.
state: HandlerState,

Expand Down Expand Up @@ -243,7 +240,6 @@ where
current_outbound_substream_id: SubstreamId(0),
state: HandlerState::Active,
max_dial_negotiated: 8,
keep_alive: KeepAlive::Yes,
outbound_io_error_retries: 0,
log: log.clone(),
}
Expand Down Expand Up @@ -287,15 +283,13 @@ where
TInstant::now() + Duration::from_secs(SHUTDOWN_TIMEOUT_SECS as u64),
));
}
self.update_keep_alive();
}

/// Opens an outbound substream with a request.
fn send_request(&mut self, id: RequestId, req: RPCRequest<TSpec>) {
match self.state {
HandlerState::Active => {
self.dial_queue.push((id, req));
self.update_keep_alive();
}
_ => {
self.pending_errors.push(HandlerErr::Outbound {
Expand Down Expand Up @@ -338,43 +332,6 @@ where
}
inbound_info.pending_items.push(response);
}

/// Updates the `KeepAlive` returned by `connection_keep_alive`.
///
/// The handler stays alive as long as there are inbound/outbound substreams established and no
/// items dialing/to be dialed. Otherwise it is given a grace period of inactivity of
/// `self.inactive_timeout`.
fn update_keep_alive(&mut self) {
// Check that we don't have outbound items pending for dialing, nor dialing, nor
// established. Also check that there are no established inbound substreams.
// Errors and events need to be reported back, so check those too.
let should_shutdown = match self.state {
HandlerState::ShuttingDown(_) => {
self.dial_queue.is_empty()
&& self.outbound_substreams.is_empty()
&& self.inbound_substreams.is_empty()
&& self.pending_errors.is_empty()
&& self.events_out.is_empty()
&& self.dial_negotiated == 0
}
HandlerState::Deactivated => {
// Regardless of events, the timeout has expired. Force the disconnect.
true
}
_ => false,
};

match self.keep_alive {
KeepAlive::Yes if should_shutdown => self.keep_alive = KeepAlive::No,
KeepAlive::Yes => {} // We continue being active
KeepAlive::Until(_) if should_shutdown => self.keep_alive = KeepAlive::No, // Already deemed inactive
KeepAlive::Until(_) => {
// No longer idle
self.keep_alive = KeepAlive::Yes;
}
KeepAlive::No => {} // currently not used
}
}
}

impl<TSpec> ProtocolsHandler for RPCHandler<TSpec>
Expand Down Expand Up @@ -427,8 +384,6 @@ where
self.events_out
.push(RPCReceived::Request(self.current_inbound_substream_id, req));
self.current_inbound_substream_id.0 += 1;

self.update_keep_alive();
}

fn inject_fully_negotiated_outbound(
Expand Down Expand Up @@ -486,8 +441,6 @@ where
}
self.current_outbound_substream_id.0 += 1;
}

self.update_keep_alive();
}

fn inject_event(&mut self, rpc_event: Self::InEvent) {
Expand Down Expand Up @@ -515,7 +468,6 @@ where

// This dialing is now considered failed
self.dial_negotiated -= 1;
self.update_keep_alive();

self.outbound_io_error_retries = 0;
// map the error
Expand Down Expand Up @@ -548,7 +500,29 @@ where
}

fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive
// Check that we don't have outbound items pending for dialing, nor dialing, nor
// established. Also check that there are no established inbound substreams.
// Errors and events need to be reported back, so check those too.
let should_shutdown = match self.state {
HandlerState::ShuttingDown(_) => {
self.dial_queue.is_empty()
&& self.outbound_substreams.is_empty()
&& self.inbound_substreams.is_empty()
&& self.pending_errors.is_empty()
&& self.events_out.is_empty()
&& self.dial_negotiated == 0
}
HandlerState::Deactivated => {
// Regardless of events, the timeout has expired. Force the disconnect.
true
}
_ => false,
};
if should_shutdown {
KeepAlive::No
} else {
KeepAlive::Yes
}
}

fn poll(
Expand Down Expand Up @@ -624,8 +598,6 @@ where
if let Some(OutboundInfo { proto, req_id, .. }) =
self.outbound_substreams.remove(outbound_id.get_ref())
{
self.update_keep_alive();

let outbound_err = HandlerErr::Outbound {
id: req_id,
proto,
Expand Down Expand Up @@ -724,7 +696,6 @@ where
self.inbound_substreams.remove(&inbound_id);
}

self.update_keep_alive();
// drive outbound streams that need to be processed
for outbound_id in self.outbound_substreams.keys().copied().collect::<Vec<_>>() {
// get the state and mark it as poisoned
Expand Down Expand Up @@ -813,7 +784,6 @@ where
let request_id = entry.get().req_id;
self.outbound_substreams_delay.remove(delay_key);
entry.remove_entry();
self.update_keep_alive();
// notify the application error
if request.expected_responses() > 1 {
// return an end of stream result
Expand Down Expand Up @@ -844,7 +814,6 @@ where
error: e,
};
entry.remove_entry();
self.update_keep_alive();
return Poll::Ready(ProtocolsHandlerEvent::Custom(Err(outbound_err)));
}
},
Expand All @@ -857,7 +826,6 @@ where
let request_id = entry.get().req_id;
self.outbound_substreams_delay.remove(delay_key);
entry.remove_entry();
self.update_keep_alive();

// report the stream termination to the user
//
Expand Down Expand Up @@ -894,7 +862,6 @@ where
self.dial_negotiated += 1;
let (id, req) = self.dial_queue.remove(0);
self.dial_queue.shrink_to_fit();
self.update_keep_alive();
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(req.clone()),
info: (id, req),
Expand Down
1 change: 1 addition & 0 deletions beacon_node/eth2_libp2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ impl<TSpec: EthSpec> Service<TSpec> {
.notify_handler_buffer_size(std::num::NonZeroUsize::new(32).expect("Not zero"))
.connection_event_buffer_size(64)
.incoming_connection_limit(10)
.outgoing_connection_limit(config.target_peers * 2)
.peer_connection_limit(MAX_CONNECTIONS_PER_PEER)
.executor(Box::new(Executor(executor)))
.build()
Expand Down

0 comments on commit 1197722

Please sign in to comment.