Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dcutr): keep connection alive while we are using it #3960

Merged
merged 19 commits into from
Jun 4, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions protocols/dcutr/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
See [PR 3715].
- Remove deprecated items. See [PR 3700].

- Keep connection alive while we are using it. See [PR 3960].

[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715
[PR 3700]: https://github.com/libp2p/rust-libp2p/pull/3700
[PR 3960]: https://github.com/libp2p/rust-libp2p/pull/3960

## 0.9.1

Expand Down
158 changes: 61 additions & 97 deletions protocols/dcutr/src/behaviour_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ use libp2p_core::connection::ConnectedPoint;
use libp2p_core::multiaddr::Protocol;
use libp2p_core::{Endpoint, Multiaddr};
use libp2p_identity::PeerId;
use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm};
use libp2p_swarm::behaviour::{ConnectionClosed, DialFailure, FromSwarm};
use libp2p_swarm::dial_opts::{self, DialOpts};
use libp2p_swarm::{dummy, ConnectionDenied, ConnectionId, THandler, THandlerOutEvent};
use libp2p_swarm::{
dummy, ConnectionDenied, ConnectionHandler, ConnectionId, THandler, THandlerOutEvent,
};
use libp2p_swarm::{
ExternalAddresses, NetworkBehaviour, NotifyHandler, PollParameters, StreamUpgradeError,
THandlerInEvent, ToSwarm,
Expand All @@ -38,7 +40,7 @@ use std::task::{Context, Poll};
use thiserror::Error;
use void::Void;

const MAX_NUMBER_OF_UPGRADE_ATTEMPTS: u8 = 3;
pub(crate) const MAX_NUMBER_OF_UPGRADE_ATTEMPTS: u8 = 3;

/// The events produced by the [`Behaviour`].
#[derive(Debug)]
Expand Down Expand Up @@ -84,6 +86,9 @@ pub struct Behaviour {
/// Indexed by the [`ConnectionId`] of the relayed connection and
/// the [`PeerId`] we are trying to establish a direct connection to.
outgoing_direct_connection_attempts: HashMap<(ConnectionId, PeerId), u8>,

/// The addresses we observed of our peers.
peers_addresses: HashMap<ConnectionId, Multiaddr>,
}

impl Behaviour {
Expand All @@ -95,6 +100,7 @@ impl Behaviour {
local_peer_id,
direct_to_relayed_connections: Default::default(),
outgoing_direct_connection_attempts: Default::default(),
peers_addresses: Default::default(),
}
}

Expand All @@ -107,51 +113,6 @@ impl Behaviour {
.collect()
}

fn on_connection_established(
&mut self,
ConnectionEstablished {
peer_id,
connection_id,
endpoint: connected_point,
..
}: ConnectionEstablished,
) {
if connected_point.is_relayed() {
if connected_point.is_listener() && !self.direct_connections.contains_key(&peer_id) {
// TODO: Try dialing the remote peer directly. Specification:
//
// > The protocol starts with the completion of a relay connection from A to B. Upon
// observing the new connection, the inbound peer (here B) checks the addresses
// advertised by A via identify. If that set includes public addresses, then A may
// be reachable by a direct connection, in which case B attempts a unilateral
// connection upgrade by initiating a direct connection to A.
//
// https://github.com/libp2p/specs/blob/master/relay/DCUtR.md#the-protocol
self.queued_events.extend([
ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection_id),
event: Either::Left(handler::relayed::Command::Connect {
obs_addrs: self.observed_addresses(),
}),
},
ToSwarm::GenerateEvent(Event::InitiatedDirectConnectionUpgrade {
remote_peer_id: peer_id,
local_relayed_addr: match connected_point {
ConnectedPoint::Listener { local_addr, .. } => local_addr.clone(),
ConnectedPoint::Dialer { .. } => unreachable!("Due to outer if."),
},
}),
]);
}
} else {
self.direct_connections
.entry(peer_id)
.or_default()
.insert(connection_id);
}
}

fn on_dial_failure(
&mut self,
DialFailure {
Expand Down Expand Up @@ -188,22 +149,15 @@ impl Behaviour {
self.queued_events.push_back(ToSwarm::NotifyHandler {
handler: NotifyHandler::One(relayed_connection_id),
peer_id,
event: Either::Left(handler::relayed::Command::Connect {
obs_addrs: self.observed_addresses(),
}),
event: Either::Left(handler::relayed::Command::Connect),
})
} else {
self.queued_events.extend([
ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(relayed_connection_id),
event: Either::Left(handler::relayed::Command::UpgradeFinishedDontKeepAlive),
},
ToSwarm::GenerateEvent(Event::DirectConnectionUpgradeFailed {
self.queued_events.extend([ToSwarm::GenerateEvent(
Event::DirectConnectionUpgradeFailed {
remote_peer_id: peer_id,
error: Error::Dial,
}),
]);
},
)]);
}
}

Expand All @@ -216,6 +170,8 @@ impl Behaviour {
..
}: ConnectionClosed<<Self as NetworkBehaviour>::ConnectionHandler>,
) {
self.peers_addresses.remove(&connection_id);

if !connected_point.is_relayed() {
let connections = self
.direct_connections
Expand All @@ -242,18 +198,35 @@ impl NetworkBehaviour for Behaviour {
fn handle_established_inbound_connection(
&mut self,
connection_id: ConnectionId,
_peer: PeerId,
peer: PeerId,
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
self.peers_addresses
.insert(connection_id, remote_addr.clone());

if is_relayed(local_addr) {
return Ok(Either::Left(handler::relayed::Handler::new(
ConnectedPoint::Listener {
local_addr: local_addr.clone(),
send_back_addr: remote_addr.clone(),
let connected_point = ConnectedPoint::Listener {
local_addr: local_addr.clone(),
send_back_addr: remote_addr.clone(),
};
let mut handler =
handler::relayed::Handler::new(connected_point, self.observed_addresses());
handler.on_behaviour_event(handler::relayed::Command::Connect);

self.queued_events.extend([ToSwarm::GenerateEvent(
Event::InitiatedDirectConnectionUpgrade {
remote_peer_id: peer,
local_relayed_addr: local_addr.clone(),
},
))); // TODO: We could make two `handler::relayed::Handler` here, one inbound one outbound.
)]);

return Ok(Either::Left(handler)); // TODO: We could make two `handler::relayed::Handler` here, one inbound one outbound.
}
self.direct_connections
.entry(peer)
.or_default()
.insert(connection_id);

assert!(
self.direct_to_relayed_connections
Expand All @@ -272,15 +245,22 @@ impl NetworkBehaviour for Behaviour {
addr: &Multiaddr,
role_override: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
self.peers_addresses.insert(connection_id, addr.clone());
if is_relayed(addr) {
return Ok(Either::Left(handler::relayed::Handler::new(
ConnectedPoint::Dialer {
address: addr.clone(),
role_override,
},
self.observed_addresses(),
))); // TODO: We could make two `handler::relayed::Handler` here, one inbound one outbound.
}

self.direct_connections
.entry(peer)
.or_default()
.insert(connection_id);

// Whether this is a connection requested by this behaviour.
if let Some(&relayed_connection_id) = self.direct_to_relayed_connections.get(&connection_id)
{
Expand Down Expand Up @@ -319,24 +299,17 @@ impl NetworkBehaviour for Behaviour {
};

match handler_event {
Either::Left(handler::relayed::Event::InboundConnectRequest {
inbound_connect,
remote_addr,
}) => {
self.queued_events.extend([
ToSwarm::NotifyHandler {
handler: NotifyHandler::One(relayed_connection_id),
peer_id: event_source,
event: Either::Left(handler::relayed::Command::AcceptInboundConnect {
inbound_connect,
obs_addrs: self.observed_addresses(),
}),
},
ToSwarm::GenerateEvent(Event::RemoteInitiatedDirectConnectionUpgrade {
Either::Left(handler::relayed::Event::InboundConnectRequest) => {
self.queued_events.extend([ToSwarm::GenerateEvent(
Event::RemoteInitiatedDirectConnectionUpgrade {
remote_peer_id: event_source,
remote_relayed_addr: remote_addr,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why no longer provide the remote_relayed_addr with the RemoteInitiatedDirectConnectionUpgrade event? That would remove the need for explicit state tracking in the NetworkBehaviour implementation and instead store the state close to its source, namely the connection.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Friendly ping. Am I missing something? This should eliminate the necessity for maintaining the state of Behaviour::remote_relayed_addr, correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes-ish. The original idea was to not pass data back and forth. The behaviour learns about the address first so it is kind of redundant to pass it to the handler only to then pass it to the behaviour again.

On the flipside, not needing the hashmap removes a few error cases and simplifies the behaviour so I think it is a good idea.

@tcoratger Mind taking a look at this?

The idea would be to remove the peer_addresses hashmap and instead pass the address in the InboundConnectRequest enum from the handler to the behaviour.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@thomaseizinger @mxinden Done. I hope I have understood everything correctly.

From what I have in mind, the peers_addresses hashmap of the Behavior structure was used to list peer addresses. It was therefore updated when establishing or closing a connection.

To replace this and avoid some bug-prone cases, we prefer to simplify the behavior and place the logic in the handler instead. So I started by completely removing peers_addresses which is no longer useful. Then I added the remote_addr inside InboundConnectRequest to pass the address information from the handler, closer to the source of the connection.

Don't hesitate if there's something tricky that I didn't catch.

}),
]);
remote_relayed_addr: self
.peers_addresses
.get(&relayed_connection_id)
.cloned()
.expect("to have stored remote addr of relay connection"),
},
)]);
}
Either::Left(handler::relayed::Event::InboundNegotiationFailed { error }) => {
self.queued_events.push_back(ToSwarm::GenerateEvent(
Expand Down Expand Up @@ -384,18 +357,11 @@ impl NetworkBehaviour for Behaviour {
self.queued_events.push_back(ToSwarm::Dial { opts });
}
Either::Right(Either::Left(handler::direct::Event::DirectConnectionEstablished)) => {
self.queued_events.extend([
ToSwarm::NotifyHandler {
peer_id: event_source,
handler: NotifyHandler::One(relayed_connection_id),
event: Either::Left(
handler::relayed::Command::UpgradeFinishedDontKeepAlive,
),
},
ToSwarm::GenerateEvent(Event::DirectConnectionUpgradeSucceeded {
self.queued_events.extend([ToSwarm::GenerateEvent(
Event::DirectConnectionUpgradeSucceeded {
remote_peer_id: event_source,
}),
]);
},
)]);
}
Either::Right(Either::Right(never)) => void::unreachable(never),
};
Expand All @@ -417,14 +383,12 @@ impl NetworkBehaviour for Behaviour {
self.external_addresses.on_swarm_event(&event);

match event {
FromSwarm::ConnectionEstablished(connection_established) => {
self.on_connection_established(connection_established)
}
FromSwarm::ConnectionClosed(connection_closed) => {
self.on_connection_closed(connection_closed)
}
FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure),
FromSwarm::AddressChange(_)
| FromSwarm::ConnectionEstablished(_)
| FromSwarm::ListenFailure(_)
| FromSwarm::NewListener(_)
| FromSwarm::NewListenAddr(_)
Expand Down
Loading