Skip to content

Commit

Permalink
feat(dcutr): keep connection alive while we are using it
Browse files Browse the repository at this point in the history
Similar to #3876, we now compute `connection_keep_alive` based on whether we are still using the connection, applied to the `dcutr` protocol.

Related: #3844.

Pull-Request: #3960.
  • Loading branch information
tcoratger authored Jun 4, 2023
1 parent 75edcfc commit a4450d4
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 190 deletions.
3 changes: 3 additions & 0 deletions protocols/dcutr/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
See [PR 3715].
- Remove deprecated items. See [PR 3700].

- Keep connection alive while we are using it. See [PR 3960].

[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715
[PR 3700]: https://github.com/libp2p/rust-libp2p/pull/3700
[PR 3960]: https://github.com/libp2p/rust-libp2p/pull/3960

## 0.9.1

Expand Down
140 changes: 46 additions & 94 deletions protocols/dcutr/src/behaviour_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ use libp2p_core::connection::ConnectedPoint;
use libp2p_core::multiaddr::Protocol;
use libp2p_core::{Endpoint, Multiaddr};
use libp2p_identity::PeerId;
use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm};
use libp2p_swarm::behaviour::{ConnectionClosed, DialFailure, FromSwarm};
use libp2p_swarm::dial_opts::{self, DialOpts};
use libp2p_swarm::{dummy, ConnectionDenied, ConnectionId, THandler, THandlerOutEvent};
use libp2p_swarm::{
dummy, ConnectionDenied, ConnectionHandler, ConnectionId, THandler, THandlerOutEvent,
};
use libp2p_swarm::{
ExternalAddresses, NetworkBehaviour, NotifyHandler, PollParameters, StreamUpgradeError,
THandlerInEvent, ToSwarm,
Expand All @@ -38,7 +40,7 @@ use std::task::{Context, Poll};
use thiserror::Error;
use void::Void;

const MAX_NUMBER_OF_UPGRADE_ATTEMPTS: u8 = 3;
pub(crate) const MAX_NUMBER_OF_UPGRADE_ATTEMPTS: u8 = 3;

/// The events produced by the [`Behaviour`].
#[derive(Debug)]
Expand Down Expand Up @@ -107,51 +109,6 @@ impl Behaviour {
.collect()
}

fn on_connection_established(
&mut self,
ConnectionEstablished {
peer_id,
connection_id,
endpoint: connected_point,
..
}: ConnectionEstablished,
) {
if connected_point.is_relayed() {
if connected_point.is_listener() && !self.direct_connections.contains_key(&peer_id) {
// TODO: Try dialing the remote peer directly. Specification:
//
// > The protocol starts with the completion of a relay connection from A to B. Upon
// observing the new connection, the inbound peer (here B) checks the addresses
// advertised by A via identify. If that set includes public addresses, then A may
// be reachable by a direct connection, in which case B attempts a unilateral
// connection upgrade by initiating a direct connection to A.
//
// https://github.com/libp2p/specs/blob/master/relay/DCUtR.md#the-protocol
self.queued_events.extend([
ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection_id),
event: Either::Left(handler::relayed::Command::Connect {
obs_addrs: self.observed_addresses(),
}),
},
ToSwarm::GenerateEvent(Event::InitiatedDirectConnectionUpgrade {
remote_peer_id: peer_id,
local_relayed_addr: match connected_point {
ConnectedPoint::Listener { local_addr, .. } => local_addr.clone(),
ConnectedPoint::Dialer { .. } => unreachable!("Due to outer if."),
},
}),
]);
}
} else {
self.direct_connections
.entry(peer_id)
.or_default()
.insert(connection_id);
}
}

fn on_dial_failure(
&mut self,
DialFailure {
Expand Down Expand Up @@ -188,22 +145,15 @@ impl Behaviour {
self.queued_events.push_back(ToSwarm::NotifyHandler {
handler: NotifyHandler::One(relayed_connection_id),
peer_id,
event: Either::Left(handler::relayed::Command::Connect {
obs_addrs: self.observed_addresses(),
}),
event: Either::Left(handler::relayed::Command::Connect),
})
} else {
self.queued_events.extend([
ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(relayed_connection_id),
event: Either::Left(handler::relayed::Command::UpgradeFinishedDontKeepAlive),
},
ToSwarm::GenerateEvent(Event::DirectConnectionUpgradeFailed {
self.queued_events.extend([ToSwarm::GenerateEvent(
Event::DirectConnectionUpgradeFailed {
remote_peer_id: peer_id,
error: Error::Dial,
}),
]);
},
)]);
}
}

Expand Down Expand Up @@ -239,18 +189,32 @@ impl NetworkBehaviour for Behaviour {
fn handle_established_inbound_connection(
&mut self,
connection_id: ConnectionId,
_peer: PeerId,
peer: PeerId,
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
if is_relayed(local_addr) {
return Ok(Either::Left(handler::relayed::Handler::new(
ConnectedPoint::Listener {
local_addr: local_addr.clone(),
send_back_addr: remote_addr.clone(),
let connected_point = ConnectedPoint::Listener {
local_addr: local_addr.clone(),
send_back_addr: remote_addr.clone(),
};
let mut handler =
handler::relayed::Handler::new(connected_point, self.observed_addresses());
handler.on_behaviour_event(handler::relayed::Command::Connect);

self.queued_events.extend([ToSwarm::GenerateEvent(
Event::InitiatedDirectConnectionUpgrade {
remote_peer_id: peer,
local_relayed_addr: local_addr.clone(),
},
))); // TODO: We could make two `handler::relayed::Handler` here, one inbound one outbound.
)]);

return Ok(Either::Left(handler)); // TODO: We could make two `handler::relayed::Handler` here, one inbound one outbound.
}
self.direct_connections
.entry(peer)
.or_default()
.insert(connection_id);

assert!(
self.direct_to_relayed_connections
Expand All @@ -275,9 +239,15 @@ impl NetworkBehaviour for Behaviour {
address: addr.clone(),
role_override,
},
self.observed_addresses(),
))); // TODO: We could make two `handler::relayed::Handler` here, one inbound one outbound.
}

self.direct_connections
.entry(peer)
.or_default()
.insert(connection_id);

// Whether this is a connection requested by this behaviour.
if let Some(&relayed_connection_id) = self.direct_to_relayed_connections.get(&connection_id)
{
Expand All @@ -290,16 +260,11 @@ impl NetworkBehaviour for Behaviour {
);
}

self.queued_events.extend([
ToSwarm::NotifyHandler {
peer_id: peer,
handler: NotifyHandler::One(relayed_connection_id),
event: Either::Left(handler::relayed::Command::UpgradeFinishedDontKeepAlive),
},
ToSwarm::GenerateEvent(Event::DirectConnectionUpgradeSucceeded {
self.queued_events.extend([ToSwarm::GenerateEvent(
Event::DirectConnectionUpgradeSucceeded {
remote_peer_id: peer,
}),
]);
},
)]);
}

Ok(Either::Right(dummy::ConnectionHandler))
Expand All @@ -323,24 +288,13 @@ impl NetworkBehaviour for Behaviour {
};

match handler_event {
Either::Left(handler::relayed::Event::InboundConnectRequest {
inbound_connect,
remote_addr,
}) => {
self.queued_events.extend([
ToSwarm::NotifyHandler {
handler: NotifyHandler::One(relayed_connection_id),
peer_id: event_source,
event: Either::Left(handler::relayed::Command::AcceptInboundConnect {
inbound_connect,
obs_addrs: self.observed_addresses(),
}),
},
ToSwarm::GenerateEvent(Event::RemoteInitiatedDirectConnectionUpgrade {
Either::Left(handler::relayed::Event::InboundConnectRequest { remote_addr }) => {
self.queued_events.extend([ToSwarm::GenerateEvent(
Event::RemoteInitiatedDirectConnectionUpgrade {
remote_peer_id: event_source,
remote_relayed_addr: remote_addr,
}),
]);
},
)]);
}
Either::Left(handler::relayed::Event::InboundNegotiationFailed { error }) => {
self.queued_events.push_back(ToSwarm::GenerateEvent(
Expand Down Expand Up @@ -407,14 +361,12 @@ impl NetworkBehaviour for Behaviour {
self.external_addresses.on_swarm_event(&event);

match event {
FromSwarm::ConnectionEstablished(connection_established) => {
self.on_connection_established(connection_established)
}
FromSwarm::ConnectionClosed(connection_closed) => {
self.on_connection_closed(connection_closed)
}
FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure),
FromSwarm::AddressChange(_)
| FromSwarm::ConnectionEstablished(_)
| FromSwarm::ListenFailure(_)
| FromSwarm::NewListener(_)
| FromSwarm::NewListenAddr(_)
Expand Down
Loading

0 comments on commit a4450d4

Please sign in to comment.