Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix regression w.r.t. reporting of dial errors. #1493

Merged
merged 2 commits into from
Mar 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ workflows:
- test
- test-wasm
- check-rustdoc-links
- integration-test

jobs:
test:
Expand Down Expand Up @@ -90,3 +91,24 @@ jobs:
- ./target
- /usr/local/cargo
- /root/.cache/sccache

integration-test:
docker:
- image: rust
- image: ipfs/go-ipfs
steps:
- checkout
- restore_cache:
key: integration-test-cache-{{ epoch }}
- run:
name: Print Rust version
command: |
rustc --version
- run:
command: RUST_LOG=libp2p_swarm=debug,libp2p_kad=trace,libp2p_tcp=debug cargo run --example ipfs-kad
- save_cache:
key: integration-test-cache-{{ epoch }}
paths:
- "~/.cargo"
- "./target"

14 changes: 7 additions & 7 deletions core/src/connection/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ pub enum ConnectionError<THandlerErr> {
// TODO: Eventually this should also be a custom error?
IO(io::Error),

/// The connection was dropped because the connection limit
/// for a peer has been reached.
ConnectionLimit(ConnectionLimit),

/// The connection handler produced an error.
Handler(THandlerErr),
}
Expand All @@ -44,6 +48,8 @@ where
write!(f, "Connection error: I/O error: {}", err),
ConnectionError::Handler(err) =>
write!(f, "Connection error: Handler error: {}", err),
ConnectionError::ConnectionLimit(l) =>
write!(f, "Connection error: Connection limit: {}.", l)
}
}
}
Expand All @@ -57,6 +63,7 @@ where
match self {
ConnectionError::IO(err) => Some(err),
ConnectionError::Handler(err) => Some(err),
ConnectionError::ConnectionLimit(..) => None,
}
}
}
Expand All @@ -71,10 +78,6 @@ pub enum PendingConnectionError<TTransErr> {
/// match the one that was expected or is otherwise invalid.
InvalidPeerId,

/// The pending connection was successfully negotiated but dropped
/// because the connection limit for a peer has been reached.
ConnectionLimit(ConnectionLimit),

/// An I/O error occurred on the connection.
// TODO: Eventually this should also be a custom error?
IO(io::Error),
Expand All @@ -93,8 +96,6 @@ where
write!(f, "Pending connection: Transport error: {}", err),
PendingConnectionError::InvalidPeerId =>
write!(f, "Pending connection: Invalid peer ID."),
PendingConnectionError::ConnectionLimit(l) =>
write!(f, "Pending connection: Connection limit: {}.", l)
}
}
}
Expand All @@ -109,7 +110,6 @@ where
PendingConnectionError::IO(err) => Some(err),
PendingConnectionError::Transport(err) => Some(err),
PendingConnectionError::InvalidPeerId => None,
PendingConnectionError::ConnectionLimit(..) => None,
}
}
}
69 changes: 37 additions & 32 deletions core/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ pub enum PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TC
error: PendingConnectionError<TTransErr>,
/// The handler that was supposed to handle the connection,
/// if the connection failed before the handler was consumed.
handler: Option<THandler>,
handler: THandler,
/// The (expected) peer of the failed connection.
peer: Option<TPeerId>,
/// A reference to the pool that managed the connection.
Expand Down Expand Up @@ -222,6 +222,7 @@ where
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
TPeerId: Clone + Send + 'static,
{
let endpoint = info.to_connected_point();
if let Some(limit) = self.limits.max_pending_incoming {
Expand Down Expand Up @@ -263,7 +264,7 @@ where
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
TPeerId: Clone,
TPeerId: Clone + Send + 'static,
{
self.limits.check_outgoing(|| self.iter_pending_outgoing().count())?;
let endpoint = info.to_connected_point();
Expand Down Expand Up @@ -298,14 +299,32 @@ where
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
TPeerId: Clone + Send + 'static,
{
// Validate the received peer ID as the last step of the pending connection
// future, so that these errors can be raised before the `handler` is consumed
// by the background task, which happens when this future resolves to an
// "established" connection.
let future = future.and_then({
let endpoint = endpoint.clone();
let expected_peer = peer.clone();
let local_id = self.local_id.clone();
move |(info, muxer)| {
if let Some(peer) = expected_peer {
if &peer != info.peer_id() {
return future::err(PendingConnectionError::InvalidPeerId)
}
}

if &local_id == info.peer_id() {
return future::err(PendingConnectionError::InvalidPeerId)
}

let connected = Connected { info, endpoint };
future::ready(Ok((connected, muxer)))
}
});

let id = self.manager.add_pending(future, handler);
self.pending.insert(id, (endpoint, peer));
id
Expand Down Expand Up @@ -536,7 +555,7 @@ where
PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>
> where
TConnInfo: ConnectionInfo<PeerId = TPeerId> + Clone,
TPeerId: Clone,
TPeerId: Clone
{
loop {
let item = match self.manager.poll(cx) {
Expand All @@ -551,7 +570,7 @@ where
id,
endpoint,
error,
handler: Some(handler),
handler,
peer,
pool: self
})
Expand Down Expand Up @@ -581,39 +600,25 @@ where
.map_or(0, |conns| conns.len());
if let Err(e) = self.limits.check_established(current) {
let connected = entry.close();
return Poll::Ready(PoolEvent::PendingConnectionError {
let num_established = e.current;
return Poll::Ready(PoolEvent::ConnectionError {
id,
endpoint: connected.endpoint,
peer: Some(connected.info.peer_id().clone()),
error: PendingConnectionError::ConnectionLimit(e),
connected,
error: ConnectionError::ConnectionLimit(e),
num_established,
pool: self,
handler: None,
})
}
// Check peer ID.
if let Some(peer) = peer {
if &peer != entry.connected().peer_id() {
let connected = entry.close();
return Poll::Ready(PoolEvent::PendingConnectionError {
id,
endpoint: connected.endpoint,
peer: Some(connected.info.peer_id().clone()),
error: PendingConnectionError::InvalidPeerId,
pool: self,
handler: None,
})
// Peer ID checks must already have happened. See `add_pending`.
if cfg!(debug_assertions) {
if &self.local_id == entry.connected().peer_id() {
panic!("Unexpected local peer ID for remote.");
}
if let Some(peer) = peer {
if &peer != entry.connected().peer_id() {
panic!("Unexpected peer ID mismatch.");
}
}
}
if &self.local_id == entry.connected().peer_id() {
let connected = entry.close();
return Poll::Ready(PoolEvent::PendingConnectionError {
id,
endpoint: connected.endpoint,
peer: Some(connected.info.peer_id().clone()),
error: PendingConnectionError::InvalidPeerId,
pool: self,
handler: None,
})
}
// Add the connection to the pool.
let peer = entry.connected().peer_id().clone();
Expand Down
45 changes: 14 additions & 31 deletions core/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ use std::{
error,
fmt,
hash::Hash,
num::NonZeroUsize,
pin::Pin,
task::{Context, Poll},
};
Expand Down Expand Up @@ -331,7 +330,7 @@ where
THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static,
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
TConnInfo: Clone,
TPeerId: AsRef<[u8]> + Send + 'static,
TPeerId: Send + 'static,
{
// Poll the listener(s) for new connections.
match ListenersStream::poll(Pin::new(&mut self.listeners), cx) {
Expand Down Expand Up @@ -383,7 +382,7 @@ where
}
Poll::Ready(PoolEvent::PendingConnectionError { id, endpoint, error, handler, pool, .. }) => {
let dialing = &mut self.dialing;
let (next, event) = on_connection_failed(pool, dialing, id, endpoint, error, handler);
let (next, event) = on_connection_failed(dialing, id, endpoint, error, handler);
if let Some(dial) = next {
let transport = self.listeners.transport().clone();
if let Err(e) = dial_peer_impl(transport, pool, dialing, dial) {
Expand Down Expand Up @@ -496,13 +495,11 @@ where
/// If the failed connection attempt was a dialing attempt and there
/// are more addresses to try, new `DialingOpts` are returned.
fn on_connection_failed<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>(
pool: &Pool<TInEvent, TOutEvent, THandler, TTrans::Error,
<THandler::Handler as ConnectionHandler>::Error, TConnInfo, TPeerId>,
dialing: &mut FnvHashMap<TPeerId, peer::DialingAttempt>,
id: ConnectionId,
endpoint: ConnectedPoint,
error: PendingConnectionError<TTrans::Error>,
handler: Option<THandler>,
handler: THandler,
) -> (Option<DialingOpts<TPeerId, THandler>>, NetworkEvent<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>)
where
TTrans: Transport,
Expand All @@ -518,41 +515,27 @@ where

if let Some(peer_id) = dialing_peer {
// A pending outgoing connection to a known peer failed.
let attempt = dialing.remove(&peer_id).expect("by (1)");
let mut attempt = dialing.remove(&peer_id).expect("by (1)");

let num_remain = attempt.next.len();
let failed_addr = attempt.current.clone();

let new_state = if pool.is_connected(&peer_id) {
peer::PeerState::Connected
} else if num_remain == 0 { // (2)
peer::PeerState::Disconnected
} else {
peer::PeerState::Dialing {
num_pending_addresses: NonZeroUsize::new(num_remain).expect("by (2)"),
}
};

let opts =
if let Some(handler) = handler {
if !attempt.next.is_empty() {
let mut attempt = attempt;
let next_attempt = attempt.next.remove(0);
Some(DialingOpts {
peer: peer_id.clone(),
handler,
address: next_attempt,
remaining: attempt.next
})
} else {
None
}
if num_remain > 0 {
let next_attempt = attempt.next.remove(0);
let opts = DialingOpts {
peer: peer_id.clone(),
handler,
address: next_attempt,
remaining: attempt.next
};
Some(opts)
} else {
None
};

(opts, NetworkEvent::DialError {
new_state,
attempts_remaining: num_remain,
peer_id,
multiaddr: failed_addr,
error,
Expand Down
11 changes: 5 additions & 6 deletions core/src/network/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use crate::{
pool::Pool,
},
muxing::StreamMuxer,
network::peer::PeerState,
transport::{Transport, TransportError},
};
use futures::prelude::*;
Expand Down Expand Up @@ -122,8 +121,8 @@ where

/// A dialing attempt to an address of a peer failed.
DialError {
/// New state of a peer.
new_state: PeerState,
/// The number of remaining dialing attempts.
attempts_remaining: usize,

/// Id of the peer we were trying to dial.
peer_id: TPeerId,
Expand All @@ -145,7 +144,7 @@ where

/// The handler that was passed to `dial()`, if the
/// connection failed before the handler was consumed.
handler: Option<THandler>,
handler: THandler,
},

/// An established connection produced an event.
Expand Down Expand Up @@ -219,9 +218,9 @@ where
.field("error", error)
.finish()
}
NetworkEvent::DialError { new_state, peer_id, multiaddr, error } => {
NetworkEvent::DialError { attempts_remaining, peer_id, multiaddr, error } => {
f.debug_struct("DialError")
.field("new_state", new_state)
.field("attempts_remaining", attempts_remaining)
.field("peer_id", peer_id)
.field("multiaddr", multiaddr)
.field("error", error)
Expand Down
18 changes: 0 additions & 18 deletions core/src/network/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,27 +42,9 @@ use std::{
error,
fmt,
hash::Hash,
num::NonZeroUsize,
};
use super::{Network, DialingOpts};

/// The state of a (remote) peer as seen by the local peer
/// through a [`Network`].
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum PeerState {
/// The [`Network`] is connected to the peer, i.e. has at least one
/// established connection.
Connected,
/// We are currently trying to reach this peer.
Dialing {
/// Number of addresses we are trying to dial.
num_pending_addresses: NonZeroUsize,
},
/// The [`Network`] is disconnected from the peer, i.e. has no
/// established connection and no pending, outgoing connection.
Disconnected,
}

/// The possible representations of a peer in a [`Network`], as
/// seen by the local node.
pub enum Peer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
Expand Down
Loading