From d17111c19fe6281511e3059d31d523af4f25d5cb Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 27 Oct 2023 18:28:54 +1100 Subject: [PATCH 01/15] Rewrite dcutr handler --- Cargo.lock | 1 + misc/metrics/src/dcutr.rs | 10 -- protocols/dcutr/Cargo.toml | 1 + protocols/dcutr/src/behaviour.rs | 56 +++--- protocols/dcutr/src/handler/relayed.rs | 211 +++++++++++------------ protocols/dcutr/src/lib.rs | 4 +- protocols/dcutr/src/protocol/inbound.rs | 157 ++++++++--------- protocols/dcutr/src/protocol/outbound.rs | 168 +++++++++--------- protocols/dcutr/tests/lib.rs | 20 --- 9 files changed, 278 insertions(+), 350 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f4fd13cfccd..cad7380ca09 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2498,6 +2498,7 @@ dependencies = [ "either", "env_logger 0.10.0", "futures", + "futures-bounded", "futures-timer", "instant", "libp2p-core", diff --git a/misc/metrics/src/dcutr.rs b/misc/metrics/src/dcutr.rs index dc15e1f838d..a56688c2a95 100644 --- a/misc/metrics/src/dcutr.rs +++ b/misc/metrics/src/dcutr.rs @@ -49,8 +49,6 @@ struct EventLabels { #[derive(Debug, Clone, Hash, PartialEq, Eq, EncodeLabelValue)] enum EventType { - InitiateDirectConnectionUpgrade, - RemoteInitiatedDirectConnectionUpgrade, DirectConnectionUpgradeSucceeded, DirectConnectionUpgradeFailed, } @@ -58,14 +56,6 @@ enum EventType { impl From<&libp2p_dcutr::Event> for EventType { fn from(event: &libp2p_dcutr::Event) -> Self { match event { - libp2p_dcutr::Event::InitiatedDirectConnectionUpgrade { - remote_peer_id: _, - local_relayed_addr: _, - } => EventType::InitiateDirectConnectionUpgrade, - libp2p_dcutr::Event::RemoteInitiatedDirectConnectionUpgrade { - remote_peer_id: _, - remote_relayed_addr: _, - } => EventType::RemoteInitiatedDirectConnectionUpgrade, libp2p_dcutr::Event::DirectConnectionUpgradeSucceeded { remote_peer_id: _, connection_id: _, diff --git a/protocols/dcutr/Cargo.toml b/protocols/dcutr/Cargo.toml index ce038b4b5b7..a456dfd0e8d 100644 --- a/protocols/dcutr/Cargo.toml +++ b/protocols/dcutr/Cargo.toml @@ -25,6 +25,7 @@ quick-protobuf-codec = { workspace = true } thiserror = "1.0" void = "1" lru = "0.11.1" +futures-bounded = { workspace = true } [dev-dependencies] async-std = { version = "1.12.0", features = ["attributes"] } diff --git a/protocols/dcutr/src/behaviour.rs b/protocols/dcutr/src/behaviour.rs index 72b30421346..f4dddb8f052 100644 --- a/protocols/dcutr/src/behaviour.rs +++ b/protocols/dcutr/src/behaviour.rs @@ -20,7 +20,7 @@ //! [`NetworkBehaviour`] to act as a direct connection upgrade through relay node. -use crate::handler; +use crate::{handler, protocol}; use either::Either; use libp2p_core::connection::ConnectedPoint; use libp2p_core::multiaddr::Protocol; @@ -32,7 +32,7 @@ use libp2p_swarm::{ dummy, ConnectionDenied, ConnectionHandler, ConnectionId, NewExternalAddrCandidate, THandler, THandlerOutEvent, }; -use libp2p_swarm::{NetworkBehaviour, NotifyHandler, StreamUpgradeError, THandlerInEvent, ToSwarm}; +use libp2p_swarm::{NetworkBehaviour, NotifyHandler, THandlerInEvent, ToSwarm}; use lru::LruCache; use std::collections::{HashMap, HashSet, VecDeque}; use std::num::NonZeroUsize; @@ -45,14 +45,14 @@ pub(crate) const MAX_NUMBER_OF_UPGRADE_ATTEMPTS: u8 = 3; /// The events produced by the [`Behaviour`]. #[derive(Debug)] pub enum Event { - InitiatedDirectConnectionUpgrade { - remote_peer_id: PeerId, - local_relayed_addr: Multiaddr, - }, - RemoteInitiatedDirectConnectionUpgrade { - remote_peer_id: PeerId, - remote_relayed_addr: Multiaddr, - }, + // InitiatedDirectConnectionUpgrade { + // remote_peer_id: PeerId, + // local_relayed_addr: Multiaddr, + // }, + // RemoteInitiatedDirectConnectionUpgrade { + // remote_peer_id: PeerId, + // remote_relayed_addr: Multiaddr, + // }, DirectConnectionUpgradeSucceeded { remote_peer_id: PeerId, connection_id: ConnectionId, @@ -68,8 +68,10 @@ pub enum Event { pub enum Error { #[error("Failed to dial peer.")] Dial, - #[error("Failed to establish substream: {0}.")] - Handler(StreamUpgradeError), + #[error("Outbound handshake failed: {0}.")] + Outbound(protocol::outbound::Error), + #[error("Inbound handshake failed: {0}.")] + Inbound(protocol::inbound::Error), } pub struct Behaviour { @@ -197,13 +199,6 @@ impl NetworkBehaviour for Behaviour { handler::relayed::Handler::new(connected_point, self.observed_addresses()); handler.on_behaviour_event(handler::relayed::Command::Connect); - self.queued_events.extend([ToSwarm::GenerateEvent( - Event::InitiatedDirectConnectionUpgrade { - remote_peer_id: peer, - local_relayed_addr: local_addr.clone(), - }, - )]); - return Ok(Either::Left(handler)); // TODO: We could make two `handler::relayed::Handler` here, one inbound one outbound. } self.direct_connections @@ -284,15 +279,7 @@ impl NetworkBehaviour for Behaviour { }; match handler_event { - Either::Left(handler::relayed::Event::InboundConnectRequest { remote_addr }) => { - self.queued_events.extend([ToSwarm::GenerateEvent( - Event::RemoteInitiatedDirectConnectionUpgrade { - remote_peer_id: event_source, - remote_relayed_addr: remote_addr, - }, - )]); - } - Either::Left(handler::relayed::Event::InboundConnectNegotiated(remote_addrs)) => { + Either::Left(handler::relayed::Event::InboundConnectNegotiated { remote_addrs }) => { log::debug!( "Attempting to hole-punch as dialer to {event_source} using {remote_addrs:?}" ); @@ -308,15 +295,24 @@ impl NetworkBehaviour for Behaviour { .insert(maybe_direct_connection_id, relayed_connection_id); self.queued_events.push_back(ToSwarm::Dial { opts }); } - Either::Left(handler::relayed::Event::OutboundNegotiationFailed { error }) => { + Either::Left(handler::relayed::Event::OutboundConnectFailed { error }) => { self.queued_events.push_back(ToSwarm::GenerateEvent( Event::DirectConnectionUpgradeFailed { remote_peer_id: event_source, connection_id: relayed_connection_id, - error: Error::Handler(error), + error: Error::Outbound(error), }, )); } + Either::Left(handler::relayed::Event::InboundConnectFailed { error }) => { + self.queued_events.push_back(ToSwarm::GenerateEvent( + Event::DirectConnectionUpgradeFailed { + remote_peer_id: event_source, + connection_id: relayed_connection_id, + error: Error::Inbound(error), + }, + )) + } Either::Left(handler::relayed::Event::OutboundConnectNegotiated { remote_addrs }) => { log::debug!( "Attempting to hole-punch as listener to {event_source} using {remote_addrs:?}" diff --git a/protocols/dcutr/src/handler/relayed.rs b/protocols/dcutr/src/handler/relayed.rs index 23ab9f4ae5a..dea41ae3591 100644 --- a/protocols/dcutr/src/handler/relayed.rs +++ b/protocols/dcutr/src/handler/relayed.rs @@ -21,22 +21,25 @@ //! [`ConnectionHandler`] handling relayed connection potentially upgraded to a direct connection. use crate::behaviour::MAX_NUMBER_OF_UPGRADE_ATTEMPTS; -use crate::protocol; +use crate::{protocol, PROTOCOL_NAME}; use either::Either; use futures::future; -use futures::future::{BoxFuture, FutureExt}; use libp2p_core::multiaddr::Multiaddr; -use libp2p_core::upgrade::DeniedUpgrade; +use libp2p_core::upgrade::{DeniedUpgrade, ReadyUpgrade}; use libp2p_core::ConnectedPoint; use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, }; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, StreamUpgradeError, SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, StreamUpgradeError, + SubstreamProtocol, }; +use protocol::{inbound, outbound}; use std::collections::VecDeque; +use std::io; use std::task::{Context, Poll}; +use std::time::Duration; #[derive(Debug)] pub enum Command { @@ -45,26 +48,14 @@ pub enum Command { #[derive(Debug)] pub enum Event { - InboundConnectRequest { - remote_addr: Multiaddr, - }, - InboundConnectNegotiated(Vec), - OutboundNegotiationFailed { - error: StreamUpgradeError, - }, - OutboundConnectNegotiated { - remote_addrs: Vec, - }, + InboundConnectNegotiated { remote_addrs: Vec }, + InboundConnectFailed { error: inbound::Error }, + OutboundConnectNegotiated { remote_addrs: Vec }, + OutboundConnectFailed { error: outbound::Error }, } pub struct Handler { endpoint: ConnectedPoint, - /// A pending fatal error that results in the connection being closed. - pending_error: Option< - StreamUpgradeError< - Either, - >, - >, /// Queue of events to return when polled. queued_events: VecDeque< ConnectionHandlerEvent< @@ -74,9 +65,12 @@ pub struct Handler { ::Error, >, >, - /// Inbound connect, accepted by the behaviour, pending completion. - inbound_connect: - Option, protocol::inbound::UpgradeError>>>, + + // Inbound DCUtR handshakes + inbound_stream: futures_bounded::FuturesSet, inbound::Error>>, + + // Outbound DCUtR handshake. + outbound_stream: futures_bounded::FuturesSet, outbound::Error>>, /// The addresses we will send to the other party for hole-punching attempts. holepunch_candidates: Vec, @@ -88,9 +82,9 @@ impl Handler { pub fn new(endpoint: ConnectedPoint, holepunch_candidates: Vec) -> Self { Self { endpoint, - pending_error: Default::default(), queued_events: Default::default(), - inbound_connect: Default::default(), + inbound_stream: futures_bounded::FuturesSet::new(Duration::from_secs(10), 1), + outbound_stream: futures_bounded::FuturesSet::new(Duration::from_secs(10), 1), holepunch_candidates, attempts: 0, } @@ -106,29 +100,19 @@ impl Handler { >, ) { match output { - future::Either::Left(inbound_connect) => { + future::Either::Left(stream) => { if self - .inbound_connect - .replace( - inbound_connect - .accept(self.holepunch_candidates.clone()) - .boxed(), - ) - .is_some() + .inbound_stream + .try_push(inbound::handshake( + stream, + self.holepunch_candidates.clone(), + )) + .is_err() { log::warn!( - "New inbound connect stream while still upgrading previous one. \ - Replacing previous with new.", + "New inbound connect stream while still upgrading previous one. Replacing previous with new.", ); } - let remote_addr = match &self.endpoint { - ConnectedPoint::Dialer { address, role_override: _ } => address.clone(), - ConnectedPoint::Listener { ..} => unreachable!("`::listen_protocol` denies all incoming substreams as a listener."), - }; - self.queued_events - .push_back(ConnectionHandlerEvent::NotifyBehaviour( - Event::InboundConnectRequest { remote_addr }, - )); self.attempts += 1; } // A connection listener denies all incoming substreams, thus none can ever be fully negotiated. @@ -139,8 +123,7 @@ impl Handler { fn on_fully_negotiated_outbound( &mut self, FullyNegotiatedOutbound { - protocol: protocol::outbound::Connect { obs_addrs }, - .. + protocol: stream, .. }: FullyNegotiatedOutbound< ::OutboundProtocol, ::OutboundOpenInfo, @@ -150,12 +133,18 @@ impl Handler { self.endpoint.is_listener(), "A connection dialer never initiates a connection upgrade." ); - self.queued_events - .push_back(ConnectionHandlerEvent::NotifyBehaviour( - Event::OutboundConnectNegotiated { - remote_addrs: obs_addrs, - }, - )); + if self + .outbound_stream + .try_push(outbound::handshake( + stream, + self.holepunch_candidates.clone(), + )) + .is_err() + { + log::warn!( + "New outbound connect stream while still upgrading previous one. Replacing previous with new.", + ); + } } fn on_listen_upgrade_error( @@ -165,10 +154,7 @@ impl Handler { ::InboundProtocol, >, ) { - self.pending_error = Some(StreamUpgradeError::Apply(match error { - Either::Left(e) => Either::Left(e), - Either::Right(v) => void::unreachable(v), - })); + void::unreachable(error.into_inner()); } fn on_dial_upgrade_error( @@ -178,50 +164,34 @@ impl Handler { ::OutboundProtocol, >, ) { - match error { - StreamUpgradeError::Timeout => { - self.queued_events - .push_back(ConnectionHandlerEvent::NotifyBehaviour( - Event::OutboundNegotiationFailed { - error: StreamUpgradeError::Timeout, - }, - )); - } - StreamUpgradeError::NegotiationFailed => { - // The remote merely doesn't support the DCUtR protocol. - // This is no reason to close the connection, which may - // successfully communicate with other protocols already. - self.queued_events - .push_back(ConnectionHandlerEvent::NotifyBehaviour( - Event::OutboundNegotiationFailed { - error: StreamUpgradeError::NegotiationFailed, - }, - )); - } - _ => { - // Anything else is considered a fatal error or misbehaviour of - // the remote peer and results in closing the connection. - self.pending_error = Some(error.map_upgrade_err(Either::Right)); - } - } + let error = match error { + StreamUpgradeError::Apply(v) => void::unreachable(v), + StreamUpgradeError::NegotiationFailed => outbound::Error::Unsupported, + StreamUpgradeError::Io(e) => outbound::Error::Io(e), + StreamUpgradeError::Timeout => outbound::Error::Io(io::ErrorKind::TimedOut.into()), + }; + + self.queued_events + .push_back(ConnectionHandlerEvent::NotifyBehaviour( + Event::OutboundConnectFailed { error }, + )) } } impl ConnectionHandler for Handler { type FromBehaviour = Command; type ToBehaviour = Event; - type Error = StreamUpgradeError< - Either, - >; - type InboundProtocol = Either; - type OutboundProtocol = protocol::outbound::Upgrade; + type Error = + StreamUpgradeError>; + type InboundProtocol = Either, DeniedUpgrade>; + type OutboundProtocol = ReadyUpgrade; type OutboundOpenInfo = (); type InboundOpenInfo = (); fn listen_protocol(&self) -> SubstreamProtocol { match self.endpoint { ConnectedPoint::Dialer { .. } => { - SubstreamProtocol::new(Either::Left(protocol::inbound::Upgrade {}), ()) + SubstreamProtocol::new(Either::Left(ReadyUpgrade::new(PROTOCOL_NAME)), ()) } ConnectedPoint::Listener { .. } => { // By the protocol specification the listening side of a relayed connection @@ -239,10 +209,7 @@ impl ConnectionHandler for Handler { Command::Connect => { self.queued_events .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new( - protocol::outbound::Upgrade::new(self.holepunch_candidates.clone()), - (), - ), + protocol: SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ()), }); self.attempts += 1; } @@ -268,31 +235,55 @@ impl ConnectionHandler for Handler { Self::Error, >, > { - // Check for a pending (fatal) error. - if let Some(err) = self.pending_error.take() { - // The handler will not be polled again by the `Swarm`. - return Poll::Ready(ConnectionHandlerEvent::Close(err)); - } - // Return queued events. if let Some(event) = self.queued_events.pop_front() { return Poll::Ready(event); } - if let Some(Poll::Ready(result)) = self.inbound_connect.as_mut().map(|f| f.poll_unpin(cx)) { - self.inbound_connect = None; - match result { - Ok(addresses) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::InboundConnectNegotiated(addresses), - )); - } - Err(e) => { - return Poll::Ready(ConnectionHandlerEvent::Close(StreamUpgradeError::Apply( - Either::Left(e), - ))) - } + match self.inbound_stream.poll_unpin(cx) { + Poll::Ready(Ok(Ok(addresses))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::InboundConnectNegotiated { + remote_addrs: addresses, + }, + )) + } + Poll::Ready(Ok(Err(error))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::InboundConnectFailed { error }, + )) + } + Poll::Ready(Err(futures_bounded::Timeout { .. })) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::InboundConnectFailed { + error: inbound::Error::Io(io::ErrorKind::TimedOut.into()), + }, + )) + } + Poll::Pending => {} + } + + match self.outbound_stream.poll_unpin(cx) { + Poll::Ready(Ok(Ok(addresses))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::OutboundConnectNegotiated { + remote_addrs: addresses, + }, + )) + } + Poll::Ready(Ok(Err(error))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::OutboundConnectFailed { error }, + )) + } + Poll::Ready(Err(futures_bounded::Timeout { .. })) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::OutboundConnectFailed { + error: outbound::Error::Io(io::ErrorKind::TimedOut.into()), + }, + )) } + Poll::Pending => {} } Poll::Pending diff --git a/protocols/dcutr/src/lib.rs b/protocols/dcutr/src/lib.rs index 389365f94c5..7c5d28aba19 100644 --- a/protocols/dcutr/src/lib.rs +++ b/protocols/dcutr/src/lib.rs @@ -36,8 +36,8 @@ mod proto { pub use behaviour::{Behaviour, Error, Event}; pub use protocol::PROTOCOL_NAME; pub mod inbound { - pub use crate::protocol::inbound::UpgradeError; + pub use crate::protocol::inbound::ProtocolViolation; } pub mod outbound { - pub use crate::protocol::outbound::UpgradeError; + pub use crate::protocol::outbound::ProtocolViolation; } diff --git a/protocols/dcutr/src/protocol/inbound.rs b/protocols/dcutr/src/protocol/inbound.rs index d38b6f4559a..acd8d60f754 100644 --- a/protocols/dcutr/src/protocol/inbound.rs +++ b/protocols/dcutr/src/protocol/inbound.rs @@ -20,114 +20,95 @@ use crate::proto; use asynchronous_codec::Framed; -use futures::{future::BoxFuture, prelude::*}; -use libp2p_core::{multiaddr::Protocol, upgrade, Multiaddr}; -use libp2p_swarm::{Stream, StreamProtocol}; +use futures::prelude::*; +use libp2p_core::{multiaddr::Protocol, Multiaddr}; +use libp2p_swarm::Stream; use std::convert::TryFrom; -use std::iter; +use std::io; use thiserror::Error; -pub struct Upgrade {} - -impl upgrade::UpgradeInfo for Upgrade { - type Info = StreamProtocol; - type InfoIter = iter::Once; +pub(crate) async fn handshake( + stream: Stream, + candidates: Vec, +) -> Result, Error> { + let mut stream = Framed::new( + stream, + quick_protobuf_codec::Codec::new(super::MAX_MESSAGE_SIZE_BYTES), + ); + + let proto::HolePunch { type_pb, ObsAddrs } = stream + .next() + .await + .ok_or(io::Error::from(io::ErrorKind::UnexpectedEof))??; + + let obs_addrs = if ObsAddrs.is_empty() { + return Err(Error::Protocol(ProtocolViolation::NoAddresses)); + } else { + ObsAddrs + .into_iter() + .filter_map(|a| match Multiaddr::try_from(a.to_vec()) { + Ok(a) => Some(a), + Err(e) => { + log::debug!("Unable to parse multiaddr: {e}"); + None + } + }) + // Filter out relayed addresses. + .filter(|a| { + if a.iter().any(|p| p == Protocol::P2pCircuit) { + log::debug!("Dropping relayed address {a}"); + false + } else { + true + } + }) + .collect::>() + }; - fn protocol_info(&self) -> Self::InfoIter { - iter::once(super::PROTOCOL_NAME) + match type_pb { + proto::Type::CONNECT => {} + proto::Type::SYNC => return Err(Error::Protocol(ProtocolViolation::UnexpectedTypeSync)), } -} - -impl upgrade::InboundUpgrade for Upgrade { - type Output = PendingConnect; - type Error = UpgradeError; - type Future = BoxFuture<'static, Result>; - - fn upgrade_inbound(self, substream: Stream, _: Self::Info) -> Self::Future { - let mut substream = Framed::new( - substream, - quick_protobuf_codec::Codec::new(super::MAX_MESSAGE_SIZE_BYTES), - ); - - async move { - let proto::HolePunch { type_pb, ObsAddrs } = - substream.next().await.ok_or(UpgradeError::StreamClosed)??; - let obs_addrs = if ObsAddrs.is_empty() { - return Err(UpgradeError::NoAddresses); - } else { - ObsAddrs - .into_iter() - .filter_map(|a| match Multiaddr::try_from(a.to_vec()) { - Ok(a) => Some(a), - Err(e) => { - log::debug!("Unable to parse multiaddr: {e}"); - None - } - }) - // Filter out relayed addresses. - .filter(|a| { - if a.iter().any(|p| p == Protocol::P2pCircuit) { - log::debug!("Dropping relayed address {a}"); - false - } else { - true - } - }) - .collect::>() - }; + let msg = proto::HolePunch { + type_pb: proto::Type::CONNECT, + ObsAddrs: candidates.into_iter().map(|a| a.to_vec()).collect(), + }; - match type_pb { - proto::Type::CONNECT => {} - proto::Type::SYNC => return Err(UpgradeError::UnexpectedTypeSync), - } + stream.send(msg).await?; + let proto::HolePunch { type_pb, .. } = stream + .next() + .await + .ok_or(io::Error::from(io::ErrorKind::UnexpectedEof))??; - Ok(PendingConnect { - substream, - remote_obs_addrs: obs_addrs, - }) + match type_pb { + proto::Type::CONNECT => { + return Err(Error::Protocol(ProtocolViolation::UnexpectedTypeConnect)) } - .boxed() + proto::Type::SYNC => {} } -} -pub struct PendingConnect { - substream: Framed>, - remote_obs_addrs: Vec, + Ok(obs_addrs) } -impl PendingConnect { - pub async fn accept( - mut self, - local_obs_addrs: Vec, - ) -> Result, UpgradeError> { - let msg = proto::HolePunch { - type_pb: proto::Type::CONNECT, - ObsAddrs: local_obs_addrs.into_iter().map(|a| a.to_vec()).collect(), - }; - - self.substream.send(msg).await?; - let proto::HolePunch { type_pb, .. } = self - .substream - .next() - .await - .ok_or(UpgradeError::StreamClosed)??; - - match type_pb { - proto::Type::CONNECT => return Err(UpgradeError::UnexpectedTypeConnect), - proto::Type::SYNC => {} - } +#[derive(Debug, Error)] +pub enum Error { + #[error("IO error")] + Io(#[from] io::Error), + #[error("Protocol error")] + Protocol(#[from] ProtocolViolation), +} - Ok(self.remote_obs_addrs) +impl From for Error { + fn from(e: quick_protobuf_codec::Error) -> Self { + Error::Protocol(ProtocolViolation::Codec(e)) } } #[derive(Debug, Error)] -pub enum UpgradeError { +pub enum ProtocolViolation { #[error(transparent)] Codec(#[from] quick_protobuf_codec::Error), - #[error("Stream closed")] - StreamClosed, #[error("Expected at least one address in reservation.")] NoAddresses, #[error("Failed to parse response type field.")] diff --git a/protocols/dcutr/src/protocol/outbound.rs b/protocols/dcutr/src/protocol/outbound.rs index 960d98cbe66..018f6dba650 100644 --- a/protocols/dcutr/src/protocol/outbound.rs +++ b/protocols/dcutr/src/protocol/outbound.rs @@ -19,115 +19,103 @@ // DEALINGS IN THE SOFTWARE. use crate::proto; +use crate::PROTOCOL_NAME; use asynchronous_codec::Framed; -use futures::{future::BoxFuture, prelude::*}; +use futures::prelude::*; use futures_timer::Delay; use instant::Instant; -use libp2p_core::{multiaddr::Protocol, upgrade, Multiaddr}; -use libp2p_swarm::{Stream, StreamProtocol}; +use libp2p_core::{multiaddr::Protocol, Multiaddr}; +use libp2p_swarm::Stream; use std::convert::TryFrom; -use std::iter; +use std::io; use thiserror::Error; -pub struct Upgrade { - obs_addrs: Vec, -} +pub(crate) async fn handshake( + stream: Stream, + candidates: Vec, +) -> Result, Error> { + let mut stream = Framed::new( + stream, + quick_protobuf_codec::Codec::new(super::MAX_MESSAGE_SIZE_BYTES), + ); -impl upgrade::UpgradeInfo for Upgrade { - type Info = StreamProtocol; - type InfoIter = iter::Once; + let msg = proto::HolePunch { + type_pb: proto::Type::CONNECT, + ObsAddrs: candidates.into_iter().map(|a| a.to_vec()).collect(), + }; - fn protocol_info(&self) -> Self::InfoIter { - iter::once(super::PROTOCOL_NAME) - } -} + stream.send(msg).await?; + + let sent_time = Instant::now(); + + let proto::HolePunch { type_pb, ObsAddrs } = stream + .next() + .await + .ok_or(io::Error::from(io::ErrorKind::UnexpectedEof))??; -impl Upgrade { - pub fn new(obs_addrs: Vec) -> Self { - Self { obs_addrs } + let rtt = sent_time.elapsed(); + + match type_pb { + proto::Type::CONNECT => {} + proto::Type::SYNC => return Err(Error::Protocol(ProtocolViolation::UnexpectedTypeSync)), } + + let obs_addrs = if ObsAddrs.is_empty() { + return Err(Error::Protocol(ProtocolViolation::NoAddresses)); + } else { + ObsAddrs + .into_iter() + .filter_map(|a| match Multiaddr::try_from(a.to_vec()) { + Ok(a) => Some(a), + Err(e) => { + log::debug!("Unable to parse multiaddr: {e}"); + None + } + }) + // Filter out relayed addresses. + .filter(|a| { + if a.iter().any(|p| p == Protocol::P2pCircuit) { + log::debug!("Dropping relayed address {a}"); + false + } else { + true + } + }) + .collect::>() + }; + + let msg = proto::HolePunch { + type_pb: proto::Type::SYNC, + ObsAddrs: vec![], + }; + + stream.send(msg).await?; + + Delay::new(rtt / 2).await; + + Ok(obs_addrs) } -impl upgrade::OutboundUpgrade for Upgrade { - type Output = Connect; - type Error = UpgradeError; - type Future = BoxFuture<'static, Result>; - - fn upgrade_outbound(self, substream: Stream, _: Self::Info) -> Self::Future { - let mut substream = Framed::new( - substream, - quick_protobuf_codec::Codec::new(super::MAX_MESSAGE_SIZE_BYTES), - ); - - let msg = proto::HolePunch { - type_pb: proto::Type::CONNECT, - ObsAddrs: self.obs_addrs.into_iter().map(|a| a.to_vec()).collect(), - }; - - async move { - substream.send(msg).await?; - - let sent_time = Instant::now(); - - let proto::HolePunch { type_pb, ObsAddrs } = - substream.next().await.ok_or(UpgradeError::StreamClosed)??; - - let rtt = sent_time.elapsed(); - - match type_pb { - proto::Type::CONNECT => {} - proto::Type::SYNC => return Err(UpgradeError::UnexpectedTypeSync), - } - - let obs_addrs = if ObsAddrs.is_empty() { - return Err(UpgradeError::NoAddresses); - } else { - ObsAddrs - .into_iter() - .filter_map(|a| match Multiaddr::try_from(a.to_vec()) { - Ok(a) => Some(a), - Err(e) => { - log::debug!("Unable to parse multiaddr: {e}"); - None - } - }) - // Filter out relayed addresses. - .filter(|a| { - if a.iter().any(|p| p == Protocol::P2pCircuit) { - log::debug!("Dropping relayed address {a}"); - false - } else { - true - } - }) - .collect::>() - }; - - let msg = proto::HolePunch { - type_pb: proto::Type::SYNC, - ObsAddrs: vec![], - }; - - substream.send(msg).await?; - - Delay::new(rtt / 2).await; - - Ok(Connect { obs_addrs }) - } - .boxed() - } +#[derive(Debug, Error)] +pub enum Error { + #[error("IO error")] + Io(#[from] io::Error), + #[error("Remote does not support the `{PROTOCOL_NAME}` protocol")] + Unsupported, + #[error("Protocol error")] + Protocol(#[from] ProtocolViolation), } -pub struct Connect { - pub obs_addrs: Vec, +impl From for Error { + fn from(e: quick_protobuf_codec::Error) -> Self { + Error::Protocol(ProtocolViolation::Codec(e)) + } } #[derive(Debug, Error)] -pub enum UpgradeError { +pub enum ProtocolViolation { #[error(transparent)] Codec(#[from] quick_protobuf_codec::Error), - #[error("Stream closed")] - StreamClosed, #[error("Expected 'status' field to be set.")] MissingStatusField, #[error("Expected 'reservation' field to be set.")] diff --git a/protocols/dcutr/tests/lib.rs b/protocols/dcutr/tests/lib.rs index 6078b101fa2..ce7cae76e09 100644 --- a/protocols/dcutr/tests/lib.rs +++ b/protocols/dcutr/tests/lib.rs @@ -75,26 +75,6 @@ async fn connect() { src.dial_and_wait(dst_relayed_addr.clone()).await; - loop { - match src - .next_swarm_event() - .await - .try_into_behaviour_event() - .unwrap() - { - ClientEvent::Dcutr(dcutr::Event::RemoteInitiatedDirectConnectionUpgrade { - remote_peer_id, - remote_relayed_addr, - }) => { - if remote_peer_id == dst_peer_id && remote_relayed_addr == dst_relayed_addr { - break; - } - } - ClientEvent::Identify(_) => {} - other => panic!("Unexpected event: {other:?}."), - } - } - let dst_addr = dst_tcp_addr.with(Protocol::P2p(dst_peer_id)); let established_conn_id = src From cb070f141bc981de8ab1d8bf15c7fa4c848bd7e5 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 27 Oct 2023 18:30:11 +1100 Subject: [PATCH 02/15] Prefer early exit --- protocols/dcutr/src/protocol/inbound.rs | 42 +++++++++++----------- protocols/dcutr/src/protocol/outbound.rs | 44 ++++++++++++------------ 2 files changed, 43 insertions(+), 43 deletions(-) diff --git a/protocols/dcutr/src/protocol/inbound.rs b/protocols/dcutr/src/protocol/inbound.rs index acd8d60f754..d3d343b3d8c 100644 --- a/protocols/dcutr/src/protocol/inbound.rs +++ b/protocols/dcutr/src/protocol/inbound.rs @@ -41,30 +41,30 @@ pub(crate) async fn handshake( .await .ok_or(io::Error::from(io::ErrorKind::UnexpectedEof))??; - let obs_addrs = if ObsAddrs.is_empty() { + if ObsAddrs.is_empty() { return Err(Error::Protocol(ProtocolViolation::NoAddresses)); - } else { - ObsAddrs - .into_iter() - .filter_map(|a| match Multiaddr::try_from(a.to_vec()) { - Ok(a) => Some(a), - Err(e) => { - log::debug!("Unable to parse multiaddr: {e}"); - None - } - }) - // Filter out relayed addresses. - .filter(|a| { - if a.iter().any(|p| p == Protocol::P2pCircuit) { - log::debug!("Dropping relayed address {a}"); - false - } else { - true - } - }) - .collect::>() }; + let obs_addrs = ObsAddrs + .into_iter() + .filter_map(|a| match Multiaddr::try_from(a.to_vec()) { + Ok(a) => Some(a), + Err(e) => { + log::debug!("Unable to parse multiaddr: {e}"); + None + } + }) + // Filter out relayed addresses. + .filter(|a| { + if a.iter().any(|p| p == Protocol::P2pCircuit) { + log::debug!("Dropping relayed address {a}"); + false + } else { + true + } + }) + .collect(); + match type_pb { proto::Type::CONNECT => {} proto::Type::SYNC => return Err(Error::Protocol(ProtocolViolation::UnexpectedTypeSync)), diff --git a/protocols/dcutr/src/protocol/outbound.rs b/protocols/dcutr/src/protocol/outbound.rs index 018f6dba650..ddaea9f37c5 100644 --- a/protocols/dcutr/src/protocol/outbound.rs +++ b/protocols/dcutr/src/protocol/outbound.rs @@ -60,29 +60,29 @@ pub(crate) async fn handshake( proto::Type::SYNC => return Err(Error::Protocol(ProtocolViolation::UnexpectedTypeSync)), } - let obs_addrs = if ObsAddrs.is_empty() { + if ObsAddrs.is_empty() { return Err(Error::Protocol(ProtocolViolation::NoAddresses)); - } else { - ObsAddrs - .into_iter() - .filter_map(|a| match Multiaddr::try_from(a.to_vec()) { - Ok(a) => Some(a), - Err(e) => { - log::debug!("Unable to parse multiaddr: {e}"); - None - } - }) - // Filter out relayed addresses. - .filter(|a| { - if a.iter().any(|p| p == Protocol::P2pCircuit) { - log::debug!("Dropping relayed address {a}"); - false - } else { - true - } - }) - .collect::>() - }; + } + + let obs_addrs = ObsAddrs + .into_iter() + .filter_map(|a| match Multiaddr::try_from(a.to_vec()) { + Ok(a) => Some(a), + Err(e) => { + log::debug!("Unable to parse multiaddr: {e}"); + None + } + }) + // Filter out relayed addresses. + .filter(|a| { + if a.iter().any(|p| p == Protocol::P2pCircuit) { + log::debug!("Dropping relayed address {a}"); + false + } else { + true + } + }) + .collect(); let msg = proto::HolePunch { type_pb: proto::Type::SYNC, From 98cbab9faff49bdfb9b41abb941e898449729097 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 27 Oct 2023 18:31:27 +1100 Subject: [PATCH 03/15] Use `matches!` for shorter checks --- protocols/dcutr/src/protocol/inbound.rs | 12 ++++-------- protocols/dcutr/src/protocol/outbound.rs | 5 ++--- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/protocols/dcutr/src/protocol/inbound.rs b/protocols/dcutr/src/protocol/inbound.rs index d3d343b3d8c..95665843724 100644 --- a/protocols/dcutr/src/protocol/inbound.rs +++ b/protocols/dcutr/src/protocol/inbound.rs @@ -65,9 +65,8 @@ pub(crate) async fn handshake( }) .collect(); - match type_pb { - proto::Type::CONNECT => {} - proto::Type::SYNC => return Err(Error::Protocol(ProtocolViolation::UnexpectedTypeSync)), + if !matches!(type_pb, proto::Type::CONNECT) { + return Err(Error::Protocol(ProtocolViolation::UnexpectedTypeSync)); } let msg = proto::HolePunch { @@ -81,11 +80,8 @@ pub(crate) async fn handshake( .await .ok_or(io::Error::from(io::ErrorKind::UnexpectedEof))??; - match type_pb { - proto::Type::CONNECT => { - return Err(Error::Protocol(ProtocolViolation::UnexpectedTypeConnect)) - } - proto::Type::SYNC => {} + if !matches!(type_pb, proto::Type::SYNC) { + return Err(Error::Protocol(ProtocolViolation::UnexpectedTypeConnect)); } Ok(obs_addrs) diff --git a/protocols/dcutr/src/protocol/outbound.rs b/protocols/dcutr/src/protocol/outbound.rs index ddaea9f37c5..67c7116d706 100644 --- a/protocols/dcutr/src/protocol/outbound.rs +++ b/protocols/dcutr/src/protocol/outbound.rs @@ -55,9 +55,8 @@ pub(crate) async fn handshake( let rtt = sent_time.elapsed(); - match type_pb { - proto::Type::CONNECT => {} - proto::Type::SYNC => return Err(Error::Protocol(ProtocolViolation::UnexpectedTypeSync)), + if !matches!(type_pb, proto::Type::CONNECT) { + return Err(Error::Protocol(ProtocolViolation::UnexpectedTypeSync)); } if ObsAddrs.is_empty() { From 39361164e00e146bdbfe42a9bf103a4dd93c4028 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 27 Oct 2023 18:32:10 +1100 Subject: [PATCH 04/15] Set error to `Void` --- protocols/dcutr/src/handler/relayed.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/protocols/dcutr/src/handler/relayed.rs b/protocols/dcutr/src/handler/relayed.rs index dea41ae3591..b607764fbd5 100644 --- a/protocols/dcutr/src/handler/relayed.rs +++ b/protocols/dcutr/src/handler/relayed.rs @@ -40,6 +40,7 @@ use std::collections::VecDeque; use std::io; use std::task::{Context, Poll}; use std::time::Duration; +use void::Void; #[derive(Debug)] pub enum Command { @@ -181,8 +182,7 @@ impl Handler { impl ConnectionHandler for Handler { type FromBehaviour = Command; type ToBehaviour = Event; - type Error = - StreamUpgradeError>; + type Error = Void; type InboundProtocol = Either, DeniedUpgrade>; type OutboundProtocol = ReadyUpgrade; type OutboundOpenInfo = (); From dc94ff61383a533d4a0ed87498a0e890b08c0aab Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 27 Oct 2023 18:33:08 +1100 Subject: [PATCH 05/15] Remove unused events --- protocols/dcutr/src/behaviour.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/protocols/dcutr/src/behaviour.rs b/protocols/dcutr/src/behaviour.rs index f4dddb8f052..40a2b4794fe 100644 --- a/protocols/dcutr/src/behaviour.rs +++ b/protocols/dcutr/src/behaviour.rs @@ -45,14 +45,6 @@ pub(crate) const MAX_NUMBER_OF_UPGRADE_ATTEMPTS: u8 = 3; /// The events produced by the [`Behaviour`]. #[derive(Debug)] pub enum Event { - // InitiatedDirectConnectionUpgrade { - // remote_peer_id: PeerId, - // local_relayed_addr: Multiaddr, - // }, - // RemoteInitiatedDirectConnectionUpgrade { - // remote_peer_id: PeerId, - // remote_relayed_addr: Multiaddr, - // }, DirectConnectionUpgradeSucceeded { remote_peer_id: PeerId, connection_id: ConnectionId, From 0e33cff77785ce3053cee4775ac5ef5c80fbc27c Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 27 Oct 2023 18:34:21 +1100 Subject: [PATCH 06/15] Rename error variant --- protocols/dcutr/src/behaviour.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/protocols/dcutr/src/behaviour.rs b/protocols/dcutr/src/behaviour.rs index 40a2b4794fe..9f9ee76c10f 100644 --- a/protocols/dcutr/src/behaviour.rs +++ b/protocols/dcutr/src/behaviour.rs @@ -58,8 +58,8 @@ pub enum Event { #[derive(Debug, Error)] pub enum Error { - #[error("Failed to dial peer.")] - Dial, + #[error("Failed to hole-punch after {MAX_NUMBER_OF_UPGRADE_ATTEMPTS}")] + AttemptsExceeded, #[error("Outbound handshake failed: {0}.")] Outbound(protocol::outbound::Error), #[error("Inbound handshake failed: {0}.")] @@ -140,7 +140,7 @@ impl Behaviour { Event::DirectConnectionUpgradeFailed { remote_peer_id: peer_id, connection_id: failed_direct_connection, - error: Error::Dial, + error: Error::AttemptsExceeded, }, )]); } From 5361e8311dfbe52054bc2476eb0db27f036a437c Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 27 Oct 2023 18:38:01 +1100 Subject: [PATCH 07/15] Turn `dcutr::Event` into a struct Results in shorter names. --- hole-punching-tests/src/main.rs | 23 ++++++------ misc/metrics/src/dcutr.rs | 7 ++-- protocols/dcutr/src/behaviour.rs | 61 +++++++++++++------------------- protocols/dcutr/tests/lib.rs | 8 +++-- 4 files changed, 43 insertions(+), 56 deletions(-) diff --git a/hole-punching-tests/src/main.rs b/hole-punching-tests/src/main.rs index fd4616e1629..1c63e530fed 100644 --- a/hole-punching-tests/src/main.rs +++ b/hole-punching-tests/src/main.rs @@ -102,12 +102,11 @@ async fn main() -> Result<()> { .await?; } ( - SwarmEvent::Behaviour(BehaviourEvent::Dcutr( - dcutr::Event::DirectConnectionUpgradeSucceeded { - remote_peer_id, - connection_id, - }, - )), + SwarmEvent::Behaviour(BehaviourEvent::Dcutr(dcutr::Event { + remote_peer_id, + connection_id, + result: Ok(()), + })), _, ) => { log::info!("Successfully hole-punched to {remote_peer_id}"); @@ -127,13 +126,11 @@ async fn main() -> Result<()> { return Ok(()); } ( - SwarmEvent::Behaviour(BehaviourEvent::Dcutr( - dcutr::Event::DirectConnectionUpgradeFailed { - remote_peer_id, - error, - .. - }, - )), + SwarmEvent::Behaviour(BehaviourEvent::Dcutr(dcutr::Event { + remote_peer_id, + result: Err(error), + .. + })), _, ) => { log::info!("Failed to hole-punched to {remote_peer_id}"); diff --git a/misc/metrics/src/dcutr.rs b/misc/metrics/src/dcutr.rs index a56688c2a95..04b2506edba 100644 --- a/misc/metrics/src/dcutr.rs +++ b/misc/metrics/src/dcutr.rs @@ -56,14 +56,15 @@ enum EventType { impl From<&libp2p_dcutr::Event> for EventType { fn from(event: &libp2p_dcutr::Event) -> Self { match event { - libp2p_dcutr::Event::DirectConnectionUpgradeSucceeded { + libp2p_dcutr::Event { remote_peer_id: _, connection_id: _, + result: Ok(()), } => EventType::DirectConnectionUpgradeSucceeded, - libp2p_dcutr::Event::DirectConnectionUpgradeFailed { + libp2p_dcutr::Event { remote_peer_id: _, connection_id: _, - error: _, + result: Err(_), } => EventType::DirectConnectionUpgradeFailed, } } diff --git a/protocols/dcutr/src/behaviour.rs b/protocols/dcutr/src/behaviour.rs index 9f9ee76c10f..db73d872410 100644 --- a/protocols/dcutr/src/behaviour.rs +++ b/protocols/dcutr/src/behaviour.rs @@ -44,16 +44,10 @@ pub(crate) const MAX_NUMBER_OF_UPGRADE_ATTEMPTS: u8 = 3; /// The events produced by the [`Behaviour`]. #[derive(Debug)] -pub enum Event { - DirectConnectionUpgradeSucceeded { - remote_peer_id: PeerId, - connection_id: ConnectionId, - }, - DirectConnectionUpgradeFailed { - remote_peer_id: PeerId, - connection_id: ConnectionId, - error: Error, - }, +pub struct Event { + pub remote_peer_id: PeerId, + pub connection_id: ConnectionId, + pub result: Result<(), Error>, } #[derive(Debug, Error)] @@ -136,13 +130,11 @@ impl Behaviour { event: Either::Left(handler::relayed::Command::Connect), }) } else { - self.queued_events.extend([ToSwarm::GenerateEvent( - Event::DirectConnectionUpgradeFailed { - remote_peer_id: peer_id, - connection_id: failed_direct_connection, - error: Error::AttemptsExceeded, - }, - )]); + self.queued_events.extend([ToSwarm::GenerateEvent(Event { + remote_peer_id: peer_id, + connection_id: failed_direct_connection, + result: Err(Error::AttemptsExceeded), + })]); } } @@ -242,12 +234,11 @@ impl NetworkBehaviour for Behaviour { ); } - self.queued_events.extend([ToSwarm::GenerateEvent( - Event::DirectConnectionUpgradeSucceeded { - remote_peer_id: peer, - connection_id, - }, - )]); + self.queued_events.extend([ToSwarm::GenerateEvent(Event { + remote_peer_id: peer, + connection_id, + result: Ok(()), + })]); } Ok(Either::Right(dummy::ConnectionHandler)) @@ -288,22 +279,18 @@ impl NetworkBehaviour for Behaviour { self.queued_events.push_back(ToSwarm::Dial { opts }); } Either::Left(handler::relayed::Event::OutboundConnectFailed { error }) => { - self.queued_events.push_back(ToSwarm::GenerateEvent( - Event::DirectConnectionUpgradeFailed { - remote_peer_id: event_source, - connection_id: relayed_connection_id, - error: Error::Outbound(error), - }, - )); + self.queued_events.push_back(ToSwarm::GenerateEvent(Event { + remote_peer_id: event_source, + connection_id: relayed_connection_id, + result: Err(Error::Outbound(error)), + })); } Either::Left(handler::relayed::Event::InboundConnectFailed { error }) => { - self.queued_events.push_back(ToSwarm::GenerateEvent( - Event::DirectConnectionUpgradeFailed { - remote_peer_id: event_source, - connection_id: relayed_connection_id, - error: Error::Inbound(error), - }, - )) + self.queued_events.push_back(ToSwarm::GenerateEvent(Event { + remote_peer_id: event_source, + connection_id: relayed_connection_id, + result: Err(Error::Inbound(error)), + })) } Either::Left(handler::relayed::Event::OutboundConnectNegotiated { remote_addrs }) => { log::debug!( diff --git a/protocols/dcutr/tests/lib.rs b/protocols/dcutr/tests/lib.rs index ce7cae76e09..1bd40f2d522 100644 --- a/protocols/dcutr/tests/lib.rs +++ b/protocols/dcutr/tests/lib.rs @@ -90,9 +90,11 @@ async fn connect() { let reported_conn_id = src .wait(move |e| match e { - SwarmEvent::Behaviour(ClientEvent::Dcutr( - dcutr::Event::DirectConnectionUpgradeSucceeded { connection_id, .. }, - )) => Some(connection_id), + SwarmEvent::Behaviour(ClientEvent::Dcutr(dcutr::Event { + connection_id, + result: Ok(()), + .. + })) => Some(connection_id), _ => None, }) .await; From 9264d2d65636c6163bfd11fa6d88a0c19e893799 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 27 Oct 2023 18:49:50 +1100 Subject: [PATCH 08/15] Don't report errors for handshakes --- protocols/dcutr/src/behaviour.rs | 29 +++----- protocols/dcutr/src/handler/relayed.rs | 89 ++++++++++++------------- protocols/dcutr/src/protocol/inbound.rs | 2 +- 3 files changed, 52 insertions(+), 68 deletions(-) diff --git a/protocols/dcutr/src/behaviour.rs b/protocols/dcutr/src/behaviour.rs index db73d872410..01eca318b33 100644 --- a/protocols/dcutr/src/behaviour.rs +++ b/protocols/dcutr/src/behaviour.rs @@ -20,7 +20,7 @@ //! [`NetworkBehaviour`] to act as a direct connection upgrade through relay node. -use crate::{handler, protocol}; +use crate::handler; use either::Either; use libp2p_core::connection::ConnectedPoint; use libp2p_core::multiaddr::Protocol; @@ -51,13 +51,9 @@ pub struct Event { } #[derive(Debug, Error)] -pub enum Error { - #[error("Failed to hole-punch after {MAX_NUMBER_OF_UPGRADE_ATTEMPTS}")] - AttemptsExceeded, - #[error("Outbound handshake failed: {0}.")] - Outbound(protocol::outbound::Error), - #[error("Inbound handshake failed: {0}.")] - Inbound(protocol::inbound::Error), +#[error("Failed to hole-punch after {attempts} attempts")] +pub struct Error { + attempts: u8, } pub struct Behaviour { @@ -133,7 +129,9 @@ impl Behaviour { self.queued_events.extend([ToSwarm::GenerateEvent(Event { remote_peer_id: peer_id, connection_id: failed_direct_connection, - result: Err(Error::AttemptsExceeded), + result: Err(Error { + attempts: MAX_NUMBER_OF_UPGRADE_ATTEMPTS, + }), })]); } } @@ -279,18 +277,7 @@ impl NetworkBehaviour for Behaviour { self.queued_events.push_back(ToSwarm::Dial { opts }); } Either::Left(handler::relayed::Event::OutboundConnectFailed { error }) => { - self.queued_events.push_back(ToSwarm::GenerateEvent(Event { - remote_peer_id: event_source, - connection_id: relayed_connection_id, - result: Err(Error::Outbound(error)), - })); - } - Either::Left(handler::relayed::Event::InboundConnectFailed { error }) => { - self.queued_events.push_back(ToSwarm::GenerateEvent(Event { - remote_peer_id: event_source, - connection_id: relayed_connection_id, - result: Err(Error::Inbound(error)), - })) + log::debug!("Failed to perform DCUtR handshake: {error}"); } Either::Left(handler::relayed::Event::OutboundConnectNegotiated { remote_addrs }) => { log::debug!( diff --git a/protocols/dcutr/src/handler/relayed.rs b/protocols/dcutr/src/handler/relayed.rs index b607764fbd5..bb23aa0058b 100644 --- a/protocols/dcutr/src/handler/relayed.rs +++ b/protocols/dcutr/src/handler/relayed.rs @@ -50,7 +50,6 @@ pub enum Command { #[derive(Debug)] pub enum Event { InboundConnectNegotiated { remote_addrs: Vec }, - InboundConnectFailed { error: inbound::Error }, OutboundConnectNegotiated { remote_addrs: Vec }, OutboundConnectFailed { error: outbound::Error }, } @@ -235,55 +234,53 @@ impl ConnectionHandler for Handler { Self::Error, >, > { - // Return queued events. - if let Some(event) = self.queued_events.pop_front() { - return Poll::Ready(event); - } - - match self.inbound_stream.poll_unpin(cx) { - Poll::Ready(Ok(Ok(addresses))) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::InboundConnectNegotiated { - remote_addrs: addresses, - }, - )) - } - Poll::Ready(Ok(Err(error))) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::InboundConnectFailed { error }, - )) + loop { + // Return queued events. + if let Some(event) = self.queued_events.pop_front() { + return Poll::Ready(event); } - Poll::Ready(Err(futures_bounded::Timeout { .. })) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::InboundConnectFailed { - error: inbound::Error::Io(io::ErrorKind::TimedOut.into()), - }, - )) - } - Poll::Pending => {} - } - match self.outbound_stream.poll_unpin(cx) { - Poll::Ready(Ok(Ok(addresses))) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::OutboundConnectNegotiated { - remote_addrs: addresses, - }, - )) - } - Poll::Ready(Ok(Err(error))) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::OutboundConnectFailed { error }, - )) + match self.inbound_stream.poll_unpin(cx) { + Poll::Ready(Ok(Ok(addresses))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::InboundConnectNegotiated { + remote_addrs: addresses, + }, + )) + } + Poll::Ready(Ok(Err(error))) => { + log::debug!("Inbound stream failed: {error}"); + continue; + } + Poll::Ready(Err(futures_bounded::Timeout { .. })) => { + log::debug!("Inbound stream timed out"); + continue; + } + Poll::Pending => {} } - Poll::Ready(Err(futures_bounded::Timeout { .. })) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::OutboundConnectFailed { - error: outbound::Error::Io(io::ErrorKind::TimedOut.into()), - }, - )) + + match self.outbound_stream.poll_unpin(cx) { + Poll::Ready(Ok(Ok(addresses))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::OutboundConnectNegotiated { + remote_addrs: addresses, + }, + )) + } + Poll::Ready(Ok(Err(error))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::OutboundConnectFailed { error }, + )) + } + Poll::Ready(Err(futures_bounded::Timeout { .. })) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::OutboundConnectFailed { + error: outbound::Error::Io(io::ErrorKind::TimedOut.into()), + }, + )) + } + Poll::Pending => break, } - Poll::Pending => {} } Poll::Pending diff --git a/protocols/dcutr/src/protocol/inbound.rs b/protocols/dcutr/src/protocol/inbound.rs index 95665843724..ef237e243f6 100644 --- a/protocols/dcutr/src/protocol/inbound.rs +++ b/protocols/dcutr/src/protocol/inbound.rs @@ -88,7 +88,7 @@ pub(crate) async fn handshake( } #[derive(Debug, Error)] -pub enum Error { +pub(crate) enum Error { #[error("IO error")] Io(#[from] io::Error), #[error("Protocol error")] From ce83f60e72d260da8602d5ecf716613f0719b65b Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 27 Oct 2023 18:50:05 +1100 Subject: [PATCH 09/15] Re-format changelog --- protocols/dcutr/CHANGELOG.md | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/protocols/dcutr/CHANGELOG.md b/protocols/dcutr/CHANGELOG.md index 179db86dff2..2b45ede9d32 100644 --- a/protocols/dcutr/CHANGELOG.md +++ b/protocols/dcutr/CHANGELOG.md @@ -1,10 +1,7 @@ ## 0.11.0 - unreleased - Add `ConnectionId` to `Event::DirectConnectionUpgradeSucceeded` and `Event::DirectConnectionUpgradeFailed`. - See [PR 4558]. - -[PR 4558]: https://github.com/libp2p/rust-libp2p/pull/4558 - + See [PR 4558](https://github.com/libp2p/rust-libp2p/pull/4558). - Exchange address _candidates_ instead of external addresses in `CONNECT`. If hole-punching wasn't working properly for you until now, this might be the reason why. See [PR 4624](https://github.com/libp2p/rust-libp2p/pull/4624). From b157f01ad2b07edaa68027dfae20e167b917e939 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 27 Oct 2023 18:51:10 +1100 Subject: [PATCH 10/15] Add changelog entry --- protocols/dcutr/CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/protocols/dcutr/CHANGELOG.md b/protocols/dcutr/CHANGELOG.md index 2b45ede9d32..cb84020ec5f 100644 --- a/protocols/dcutr/CHANGELOG.md +++ b/protocols/dcutr/CHANGELOG.md @@ -5,6 +5,9 @@ - Exchange address _candidates_ instead of external addresses in `CONNECT`. If hole-punching wasn't working properly for you until now, this might be the reason why. See [PR 4624](https://github.com/libp2p/rust-libp2p/pull/4624). +- Simplify public API. + We now only emit a single event: whether the hole-punch was successful or not. + See [PR XXXX](https://github.com/libp2p/rust-libp2p/pull/XXXX). ## 0.10.0 From 17036b52c79fb06c23d0a15c84683cae7fd3ea9e Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 27 Oct 2023 19:03:08 +1100 Subject: [PATCH 11/15] Move connection ID into `Ok` Only makes sense when we succeeded. --- protocols/dcutr/src/behaviour.rs | 7 ++----- protocols/dcutr/tests/lib.rs | 3 +-- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/protocols/dcutr/src/behaviour.rs b/protocols/dcutr/src/behaviour.rs index 01eca318b33..e76473ebce2 100644 --- a/protocols/dcutr/src/behaviour.rs +++ b/protocols/dcutr/src/behaviour.rs @@ -46,8 +46,7 @@ pub(crate) const MAX_NUMBER_OF_UPGRADE_ATTEMPTS: u8 = 3; #[derive(Debug)] pub struct Event { pub remote_peer_id: PeerId, - pub connection_id: ConnectionId, - pub result: Result<(), Error>, + pub result: Result, } #[derive(Debug, Error)] @@ -128,7 +127,6 @@ impl Behaviour { } else { self.queued_events.extend([ToSwarm::GenerateEvent(Event { remote_peer_id: peer_id, - connection_id: failed_direct_connection, result: Err(Error { attempts: MAX_NUMBER_OF_UPGRADE_ATTEMPTS, }), @@ -234,8 +232,7 @@ impl NetworkBehaviour for Behaviour { self.queued_events.extend([ToSwarm::GenerateEvent(Event { remote_peer_id: peer, - connection_id, - result: Ok(()), + result: Ok(connection_id), })]); } diff --git a/protocols/dcutr/tests/lib.rs b/protocols/dcutr/tests/lib.rs index 1bd40f2d522..8058808e663 100644 --- a/protocols/dcutr/tests/lib.rs +++ b/protocols/dcutr/tests/lib.rs @@ -91,8 +91,7 @@ async fn connect() { let reported_conn_id = src .wait(move |e| match e { SwarmEvent::Behaviour(ClientEvent::Dcutr(dcutr::Event { - connection_id, - result: Ok(()), + result: Ok(connection_id), .. })) => Some(connection_id), _ => None, From 32ccb4d6ec381d7e75ad81e4dd71491cbdd5a43d Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 27 Oct 2023 19:09:10 +1100 Subject: [PATCH 12/15] Report an error for outbound stream failures --- protocols/dcutr/src/behaviour.rs | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/protocols/dcutr/src/behaviour.rs b/protocols/dcutr/src/behaviour.rs index e76473ebce2..ef7010a3d13 100644 --- a/protocols/dcutr/src/behaviour.rs +++ b/protocols/dcutr/src/behaviour.rs @@ -20,7 +20,7 @@ //! [`NetworkBehaviour`] to act as a direct connection upgrade through relay node. -use crate::handler; +use crate::{handler, protocol}; use either::Either; use libp2p_core::connection::ConnectedPoint; use libp2p_core::multiaddr::Protocol; @@ -50,9 +50,17 @@ pub struct Event { } #[derive(Debug, Error)] -#[error("Failed to hole-punch after {attempts} attempts")] +#[error("Failed to hole-punch connection: {inner}")] pub struct Error { - attempts: u8, + inner: InnerError, +} + +#[derive(Debug, Error)] +enum InnerError { + #[error("Giving up after {0} dial attempts")] + AttemptsExceeded(u8), + #[error("Stream error: {0}")] + OutboundError(protocol::outbound::Error), } pub struct Behaviour { @@ -128,7 +136,7 @@ impl Behaviour { self.queued_events.extend([ToSwarm::GenerateEvent(Event { remote_peer_id: peer_id, result: Err(Error { - attempts: MAX_NUMBER_OF_UPGRADE_ATTEMPTS, + inner: InnerError::AttemptsExceeded(MAX_NUMBER_OF_UPGRADE_ATTEMPTS), }), })]); } @@ -274,7 +282,14 @@ impl NetworkBehaviour for Behaviour { self.queued_events.push_back(ToSwarm::Dial { opts }); } Either::Left(handler::relayed::Event::OutboundConnectFailed { error }) => { - log::debug!("Failed to perform DCUtR handshake: {error}"); + self.queued_events.push_back(ToSwarm::GenerateEvent(Event { + remote_peer_id: event_source, + result: Err(Error { + inner: InnerError::OutboundError(error), + }), + })); + + // Maybe treat these as transient and retry? } Either::Left(handler::relayed::Event::OutboundConnectNegotiated { remote_addrs }) => { log::debug!( From e341e760f0df80eb3abc9a51ab4c3bde40eb5148 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 27 Oct 2023 19:11:04 +1100 Subject: [PATCH 13/15] Fix compile errors --- hole-punching-tests/src/main.rs | 3 +-- misc/metrics/src/dcutr.rs | 4 +--- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/hole-punching-tests/src/main.rs b/hole-punching-tests/src/main.rs index 1c63e530fed..88a202ea36a 100644 --- a/hole-punching-tests/src/main.rs +++ b/hole-punching-tests/src/main.rs @@ -104,8 +104,7 @@ async fn main() -> Result<()> { ( SwarmEvent::Behaviour(BehaviourEvent::Dcutr(dcutr::Event { remote_peer_id, - connection_id, - result: Ok(()), + result: Ok(connection_id), })), _, ) => { diff --git a/misc/metrics/src/dcutr.rs b/misc/metrics/src/dcutr.rs index 04b2506edba..3e60dca2cab 100644 --- a/misc/metrics/src/dcutr.rs +++ b/misc/metrics/src/dcutr.rs @@ -58,12 +58,10 @@ impl From<&libp2p_dcutr::Event> for EventType { match event { libp2p_dcutr::Event { remote_peer_id: _, - connection_id: _, - result: Ok(()), + result: Ok(_), } => EventType::DirectConnectionUpgradeSucceeded, libp2p_dcutr::Event { remote_peer_id: _, - connection_id: _, result: Err(_), } => EventType::DirectConnectionUpgradeFailed, } From e10ef0e26ac70b63b67182c715634535bb3a1f9c Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 27 Oct 2023 19:14:05 +1100 Subject: [PATCH 14/15] Also report inbound stream errors --- protocols/dcutr/src/behaviour.rs | 12 +++- protocols/dcutr/src/handler/relayed.rs | 89 +++++++++++++------------ protocols/dcutr/src/protocol/inbound.rs | 2 +- 3 files changed, 58 insertions(+), 45 deletions(-) diff --git a/protocols/dcutr/src/behaviour.rs b/protocols/dcutr/src/behaviour.rs index ef7010a3d13..6aecc596c71 100644 --- a/protocols/dcutr/src/behaviour.rs +++ b/protocols/dcutr/src/behaviour.rs @@ -59,7 +59,9 @@ pub struct Error { enum InnerError { #[error("Giving up after {0} dial attempts")] AttemptsExceeded(u8), - #[error("Stream error: {0}")] + #[error("Inbound stream error: {0}")] + InboundError(protocol::inbound::Error), + #[error("Outbound stream error: {0}")] OutboundError(protocol::outbound::Error), } @@ -281,6 +283,14 @@ impl NetworkBehaviour for Behaviour { .insert(maybe_direct_connection_id, relayed_connection_id); self.queued_events.push_back(ToSwarm::Dial { opts }); } + Either::Left(handler::relayed::Event::InboundConnectFailed { error }) => { + self.queued_events.push_back(ToSwarm::GenerateEvent(Event { + remote_peer_id: event_source, + result: Err(Error { + inner: InnerError::InboundError(error), + }), + })); + } Either::Left(handler::relayed::Event::OutboundConnectFailed { error }) => { self.queued_events.push_back(ToSwarm::GenerateEvent(Event { remote_peer_id: event_source, diff --git a/protocols/dcutr/src/handler/relayed.rs b/protocols/dcutr/src/handler/relayed.rs index bb23aa0058b..9d600d234e5 100644 --- a/protocols/dcutr/src/handler/relayed.rs +++ b/protocols/dcutr/src/handler/relayed.rs @@ -51,6 +51,7 @@ pub enum Command { pub enum Event { InboundConnectNegotiated { remote_addrs: Vec }, OutboundConnectNegotiated { remote_addrs: Vec }, + InboundConnectFailed { error: inbound::Error }, OutboundConnectFailed { error: outbound::Error }, } @@ -234,53 +235,55 @@ impl ConnectionHandler for Handler { Self::Error, >, > { - loop { - // Return queued events. - if let Some(event) = self.queued_events.pop_front() { - return Poll::Ready(event); - } + // Return queued events. + if let Some(event) = self.queued_events.pop_front() { + return Poll::Ready(event); + } - match self.inbound_stream.poll_unpin(cx) { - Poll::Ready(Ok(Ok(addresses))) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::InboundConnectNegotiated { - remote_addrs: addresses, - }, - )) - } - Poll::Ready(Ok(Err(error))) => { - log::debug!("Inbound stream failed: {error}"); - continue; - } - Poll::Ready(Err(futures_bounded::Timeout { .. })) => { - log::debug!("Inbound stream timed out"); - continue; - } - Poll::Pending => {} + match self.inbound_stream.poll_unpin(cx) { + Poll::Ready(Ok(Ok(addresses))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::InboundConnectNegotiated { + remote_addrs: addresses, + }, + )) + } + Poll::Ready(Ok(Err(error))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::InboundConnectFailed { error }, + )) } + Poll::Ready(Err(futures_bounded::Timeout { .. })) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::InboundConnectFailed { + error: inbound::Error::Io(io::ErrorKind::TimedOut.into()), + }, + )) + } + Poll::Pending => {} + } - match self.outbound_stream.poll_unpin(cx) { - Poll::Ready(Ok(Ok(addresses))) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::OutboundConnectNegotiated { - remote_addrs: addresses, - }, - )) - } - Poll::Ready(Ok(Err(error))) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::OutboundConnectFailed { error }, - )) - } - Poll::Ready(Err(futures_bounded::Timeout { .. })) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::OutboundConnectFailed { - error: outbound::Error::Io(io::ErrorKind::TimedOut.into()), - }, - )) - } - Poll::Pending => break, + match self.outbound_stream.poll_unpin(cx) { + Poll::Ready(Ok(Ok(addresses))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::OutboundConnectNegotiated { + remote_addrs: addresses, + }, + )) + } + Poll::Ready(Ok(Err(error))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::OutboundConnectFailed { error }, + )) + } + Poll::Ready(Err(futures_bounded::Timeout { .. })) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::OutboundConnectFailed { + error: outbound::Error::Io(io::ErrorKind::TimedOut.into()), + }, + )) } + Poll::Pending => {} } Poll::Pending diff --git a/protocols/dcutr/src/protocol/inbound.rs b/protocols/dcutr/src/protocol/inbound.rs index ef237e243f6..95665843724 100644 --- a/protocols/dcutr/src/protocol/inbound.rs +++ b/protocols/dcutr/src/protocol/inbound.rs @@ -88,7 +88,7 @@ pub(crate) async fn handshake( } #[derive(Debug, Error)] -pub(crate) enum Error { +pub enum Error { #[error("IO error")] Io(#[from] io::Error), #[error("Protocol error")] From d591784b9870633fae8e436436a9d36417968f63 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 27 Oct 2023 19:21:47 +1100 Subject: [PATCH 15/15] FIx hole-punch docs --- libp2p/src/tutorials/hole_punching.rs | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/libp2p/src/tutorials/hole_punching.rs b/libp2p/src/tutorials/hole_punching.rs index 5fd74fe754e..f9f42432ba4 100644 --- a/libp2p/src/tutorials/hole_punching.rs +++ b/libp2p/src/tutorials/hole_punching.rs @@ -166,18 +166,9 @@ //! [2022-01-30T12:54:10Z INFO client] Established connection to PeerId("12D3KooWPjceQrSwdWXPyLLeABRXmuqt69Rg3sBYbU1Nft9HyQ6X") via Dialer { address: "/ip4/$RELAY_PEER_ID/tcp/4001/p2p/12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN/p2p-circuit/p2p/12D3KooWPjceQrSwdWXPyLLeABRXmuqt69Rg3sBYbU1Nft9HyQ6X", role_override: Dialer } //! ``` //! -//! 2. The listening client initiating a direct connection upgrade for the new relayed connection. -//! Reported by [`dcutr`](crate::dcutr) through -//! [`Event::RemoteInitiatedDirectConnectionUpgrade`](crate::dcutr::Event::RemoteInitiatedDirectConnectionUpgrade). +//! 2. The direct connection upgrade, also known as hole punch, succeeding. +//! Reported by [`dcutr`](crate::dcutr) through [`Event`](crate::dcutr::Event) containing [`Result::Ok`] with the [`ConnectionId`](libp2p_swarm::ConnectionId) of the new direct connection. //! //! ``` ignore -//! [2022-01-30T12:54:11Z INFO client] RemoteInitiatedDirectConnectionUpgrade { remote_peer_id: PeerId("12D3KooWPjceQrSwdWXPyLLeABRXmuqt69Rg3sBYbU1Nft9HyQ6X"), remote_relayed_addr: "/ip4/$RELAY_PEER_ID/tcp/4001/p2p/12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN/p2p-circuit/p2p/12D3KooWPjceQrSwdWXPyLLeABRXmuqt69Rg3sBYbU1Nft9HyQ6X" } -//! ``` -//! -//! 3. The direct connection upgrade, also known as hole punch, succeeding. Reported by -//! [`dcutr`](crate::dcutr) through -//! [`Event::RemoteInitiatedDirectConnectionUpgrade`](crate::dcutr::Event::DirectConnectionUpgradeSucceeded). -//! -//! ``` ignore -//! [2022-01-30T12:54:11Z INFO client] DirectConnectionUpgradeSucceeded { remote_peer_id: PeerId("12D3KooWPjceQrSwdWXPyLLeABRXmuqt69Rg3sBYbU1Nft9HyQ6X") } +//! [2022-01-30T12:54:11Z INFO client] Event { remote_peer_id: PeerId("12D3KooWPjceQrSwdWXPyLLeABRXmuqt69Rg3sBYbU1Nft9HyQ6X"), result: Ok(2) } //! ```