diff --git a/Cargo.lock b/Cargo.lock index 79856edf267..7d3f2e5b07a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1581,7 +1581,7 @@ dependencies = [ [[package]] name = "futures-bounded" -version = "0.2.0" +version = "0.2.1" dependencies = [ "futures-timer", "futures-util", @@ -1939,6 +1939,7 @@ name = "hole-punching-tests" version = "0.1.0" dependencies = [ "anyhow", + "either", "env_logger 0.10.0", "futures", "libp2p", @@ -2946,6 +2947,7 @@ dependencies = [ "libp2p-ping", "libp2p-plaintext", "libp2p-swarm", + "libp2p-swarm-test", "libp2p-yamux", "log", "quick-protobuf", diff --git a/Cargo.toml b/Cargo.toml index 2c823756bbe..e7044a185bf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,7 +70,7 @@ resolver = "2" rust-version = "1.73.0" [workspace.dependencies] -futures-bounded = { version = "0.2.0", path = "misc/futures-bounded" } +futures-bounded = { version = "0.2.1", path = "misc/futures-bounded" } libp2p = { version = "0.53.0", path = "libp2p" } libp2p-allow-block-list = { version = "0.3.0", path = "misc/allow-block-list" } libp2p-autonat = { version = "0.12.0", path = "protocols/autonat" } diff --git a/hole-punching-tests/Cargo.toml b/hole-punching-tests/Cargo.toml index d8cf5c1ef71..4d067117260 100644 --- a/hole-punching-tests/Cargo.toml +++ b/hole-punching-tests/Cargo.toml @@ -15,3 +15,4 @@ redis = { version = "0.23.0", default-features = false, features = ["tokio-comp" tokio = { version = "1.29.1", features = ["full"] } serde = { version = "1.0.190", features = ["derive"] } serde_json = "1.0.107" +either = "1.9.0" diff --git a/hole-punching-tests/src/main.rs b/hole-punching-tests/src/main.rs index fd4616e1629..f7373aa4f94 100644 --- a/hole-punching-tests/src/main.rs +++ b/hole-punching-tests/src/main.rs @@ -19,7 +19,11 @@ // DEALINGS IN THE SOFTWARE. use anyhow::{Context, Result}; +use either::Either; use futures::stream::StreamExt; +use libp2p::core::transport::ListenerId; +use libp2p::swarm::dial_opts::DialOpts; +use libp2p::swarm::ConnectionId; use libp2p::{ core::multiaddr::{Multiaddr, Protocol}, dcutr, identify, noise, ping, relay, @@ -83,17 +87,22 @@ async fn main() -> Result<()> { .build(); client_listen_on_transport(&mut swarm, transport).await?; - client_setup(&mut swarm, &mut redis, relay_addr.clone(), mode).await?; + let id = client_setup(&mut swarm, &mut redis, relay_addr.clone(), mode).await?; let mut hole_punched_peer_connection = None; loop { - match (swarm.next().await.unwrap(), hole_punched_peer_connection) { + match ( + swarm.next().await.unwrap(), + hole_punched_peer_connection, + id, + ) { ( SwarmEvent::Behaviour(BehaviourEvent::RelayClient( relay::client::Event::ReservationReqAccepted { .. }, )), _, + _, ) => { log::info!("Relay accepted our reservation request."); @@ -109,6 +118,7 @@ async fn main() -> Result<()> { }, )), _, + _, ) => { log::info!("Successfully hole-punched to {remote_peer_id}"); @@ -121,6 +131,7 @@ async fn main() -> Result<()> { .. })), Some(hole_punched_connection), + _, ) if mode == Mode::Dial && connection == hole_punched_connection => { println!("{}", serde_json::to_string(&Report::new(rtt))?); @@ -135,12 +146,32 @@ async fn main() -> Result<()> { }, )), _, + _, ) => { log::info!("Failed to hole-punched to {remote_peer_id}"); return Err(anyhow::Error::new(error)); } - (SwarmEvent::OutgoingConnectionError { error, .. }, _) => { - anyhow::bail!(error) + ( + SwarmEvent::ListenerClosed { + listener_id, + reason: Err(e), + .. + }, + _, + Either::Left(reservation), + ) if listener_id == reservation => { + anyhow::bail!("Reservation on relay failed: {e}"); + } + ( + SwarmEvent::OutgoingConnectionError { + connection_id, + error, + .. + }, + _, + Either::Right(circuit), + ) if connection_id == circuit => { + anyhow::bail!("Circuit request relay failed: {error}"); } _ => {} } @@ -209,23 +240,30 @@ async fn client_setup( redis: &mut RedisClient, relay_addr: Multiaddr, mode: Mode, -) -> Result<()> { - match mode { +) -> Result> { + let either = match mode { Mode::Listen => { - swarm.listen_on(relay_addr.with(Protocol::P2pCircuit))?; + let id = swarm.listen_on(relay_addr.with(Protocol::P2pCircuit))?; + + Either::Left(id) } Mode::Dial => { let remote_peer_id = redis.pop(LISTEN_CLIENT_PEER_ID).await?; - swarm.dial( + let opts = DialOpts::from( relay_addr .with(Protocol::P2pCircuit) .with(Protocol::P2p(remote_peer_id)), - )?; + ); + let id = opts.connection_id(); + + swarm.dial(opts)?; + + Either::Right(id) } }; - Ok(()) + Ok(either) } fn tcp_addr(addr: IpAddr) -> Multiaddr { diff --git a/misc/futures-bounded/CHANGELOG.md b/misc/futures-bounded/CHANGELOG.md index 90bd47f2f61..9801c9c1498 100644 --- a/misc/futures-bounded/CHANGELOG.md +++ b/misc/futures-bounded/CHANGELOG.md @@ -1,3 +1,8 @@ +## 0.2.1 - unreleased + +- Add `.len()` getter to `FuturesMap`, `FuturesSet`, `StreamMap` and `StreamSet`. + See [PR 4745](https://github.com/libp2p/rust-lib2pp/pulls/4745). + ## 0.2.0 - Add `StreamMap` type and remove `Future`-suffix from `PushError::ReplacedFuture` to reuse it for `StreamMap`. diff --git a/misc/futures-bounded/Cargo.toml b/misc/futures-bounded/Cargo.toml index 7689e9bdcbc..7b622374b43 100644 --- a/misc/futures-bounded/Cargo.toml +++ b/misc/futures-bounded/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "futures-bounded" -version = "0.2.0" +version = "0.2.1" edition = "2021" rust-version.workspace = true license = "MIT" diff --git a/misc/futures-bounded/src/futures_map.rs b/misc/futures-bounded/src/futures_map.rs index 5fd06037608..8e8802254bc 100644 --- a/misc/futures-bounded/src/futures_map.rs +++ b/misc/futures-bounded/src/futures_map.rs @@ -84,6 +84,10 @@ where } } + pub fn len(&self) -> usize { + self.inner.len() + } + pub fn is_empty(&self) -> bool { self.inner.is_empty() } diff --git a/misc/futures-bounded/src/futures_set.rs b/misc/futures-bounded/src/futures_set.rs index 79a82fde110..ea8f700991d 100644 --- a/misc/futures-bounded/src/futures_set.rs +++ b/misc/futures-bounded/src/futures_set.rs @@ -42,6 +42,10 @@ impl FuturesSet { } } + pub fn len(&self) -> usize { + self.inner.len() + } + pub fn is_empty(&self) -> bool { self.inner.is_empty() } diff --git a/misc/futures-bounded/src/stream_map.rs b/misc/futures-bounded/src/stream_map.rs index 7fcdd15e132..40294ce0fba 100644 --- a/misc/futures-bounded/src/stream_map.rs +++ b/misc/futures-bounded/src/stream_map.rs @@ -88,6 +88,10 @@ where Some(inner) } + pub fn len(&self) -> usize { + self.inner.len() + } + pub fn is_empty(&self) -> bool { self.inner.is_empty() } @@ -256,7 +260,7 @@ mod tests { assert!(poll.is_pending()); assert_eq!( - streams.inner.len(), + streams.len(), 0, "resources of cancelled streams are cleaned up properly" ); diff --git a/misc/futures-bounded/src/stream_set.rs b/misc/futures-bounded/src/stream_set.rs index 4fcb649fd49..bb32835065f 100644 --- a/misc/futures-bounded/src/stream_set.rs +++ b/misc/futures-bounded/src/stream_set.rs @@ -44,6 +44,10 @@ where } } + pub fn len(&self) -> usize { + self.inner.len() + } + pub fn is_empty(&self) -> bool { self.inner.is_empty() } diff --git a/protocols/relay/CHANGELOG.md b/protocols/relay/CHANGELOG.md index d26660f14b4..20d8370cf6d 100644 --- a/protocols/relay/CHANGELOG.md +++ b/protocols/relay/CHANGELOG.md @@ -2,6 +2,17 @@ - Fix a rare race condition when making a reservation on a relay that could lead to a failed reservation. See [PR 4747](https://github.com/libp2p/rust-lib2pp/pulls/4747). +- Propagate errors of relay client to the listener / dialer. + A failed reservation will now appear as `SwarmEvent::ListenerClosed` with the `ListenerId` of the corresponding `Swarm::listen_on` call. + A failed circuit request will now appear as `SwarmEvent::OutgoingConnectionError` with the `ConnectionId` of the corresponding `Swarm::dial` call. + Lastly, a failed reservation or circuit request will **no longer** close the underlying relay connection. + As a result, we remove the following enum variants: + - `relay::client::Event::ReservationReqFailed` + - `relay::client::Event::OutboundCircuitReqFailed` + - `relay::client::Event::InboundCircuitReqDenied` + - `relay::client::Event::InboundCircuitReqDenyFailed` + + See [PR 4745](https://github.com/libp2p/rust-lib2pp/pulls/4745). ## 0.16.2 diff --git a/protocols/relay/Cargo.toml b/protocols/relay/Cargo.toml index f83cb9c4a80..bca55217a2a 100644 --- a/protocols/relay/Cargo.toml +++ b/protocols/relay/Cargo.toml @@ -37,6 +37,7 @@ libp2p-plaintext = { workspace = true } libp2p-swarm = { workspace = true, features = ["macros", "async-std"] } libp2p-yamux = { workspace = true } quickcheck = { workspace = true } +libp2p-swarm-test = { workspace = true } # Passing arguments to the docsrs builder in order to properly document cfg's. # More information: https://docs.rs/about/builds#cross-compiling diff --git a/protocols/relay/src/lib.rs b/protocols/relay/src/lib.rs index 39ccd539838..09d326be9fb 100644 --- a/protocols/relay/src/lib.rs +++ b/protocols/relay/src/lib.rs @@ -47,15 +47,12 @@ pub mod inbound { pub mod hop { pub use crate::protocol::inbound_hop::FatalUpgradeError; } - pub mod stop { - pub use crate::protocol::inbound_stop::FatalUpgradeError; - } } /// Types related to the relay protocol outbound. pub mod outbound { pub mod hop { - pub use crate::protocol::outbound_hop::FatalUpgradeError; + pub use crate::protocol::outbound_hop::{ConnectError, ProtocolViolation, ReserveError}; } pub mod stop { pub use crate::protocol::outbound_stop::FatalUpgradeError; diff --git a/protocols/relay/src/priv_client.rs b/protocols/relay/src/priv_client.rs index b15b3d68ae1..ae2ceb2e97d 100644 --- a/protocols/relay/src/priv_client.rs +++ b/protocols/relay/src/priv_client.rs @@ -25,7 +25,7 @@ pub(crate) mod transport; use crate::multiaddr_ext::MultiaddrExt; use crate::priv_client::handler::Handler; -use crate::protocol::{self, inbound_stop, outbound_hop}; +use crate::protocol::{self, inbound_stop}; use bytes::Bytes; use either::Either; use futures::channel::mpsc::Receiver; @@ -39,8 +39,7 @@ use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm use libp2p_swarm::dial_opts::DialOpts; use libp2p_swarm::{ dummy, ConnectionDenied, ConnectionHandler, ConnectionId, DialFailure, NetworkBehaviour, - NotifyHandler, Stream, StreamUpgradeError, THandler, THandlerInEvent, THandlerOutEvent, - ToSwarm, + NotifyHandler, Stream, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }; use std::collections::{hash_map, HashMap, VecDeque}; use std::io::{Error, ErrorKind, IoSlice}; @@ -59,32 +58,15 @@ pub enum Event { renewal: bool, limit: Option, }, - ReservationReqFailed { - relay_peer_id: PeerId, - /// Indicates whether the request replaces an existing reservation. - renewal: bool, - error: StreamUpgradeError, - }, OutboundCircuitEstablished { relay_peer_id: PeerId, limit: Option, }, - OutboundCircuitReqFailed { - relay_peer_id: PeerId, - error: StreamUpgradeError, - }, /// An inbound circuit has been established. InboundCircuitEstablished { src_peer_id: PeerId, limit: Option, }, - /// An inbound circuit request has been denied. - InboundCircuitReqDenied { src_peer_id: PeerId }, - /// Denying an inbound circuit request failed. - InboundCircuitReqDenyFailed { - src_peer_id: PeerId, - error: inbound_stop::UpgradeError, - }, } /// [`NetworkBehaviour`] implementation of the relay client @@ -252,32 +234,15 @@ impl NetworkBehaviour for Behaviour { limit, } } - handler::Event::ReservationReqFailed { renewal, error } => { - Event::ReservationReqFailed { - relay_peer_id: event_source, - renewal, - error, - } - } handler::Event::OutboundCircuitEstablished { limit } => { Event::OutboundCircuitEstablished { relay_peer_id: event_source, limit, } } - handler::Event::OutboundCircuitReqFailed { error } => Event::OutboundCircuitReqFailed { - relay_peer_id: event_source, - error, - }, handler::Event::InboundCircuitEstablished { src_peer_id, limit } => { Event::InboundCircuitEstablished { src_peer_id, limit } } - handler::Event::InboundCircuitReqDenied { src_peer_id } => { - Event::InboundCircuitReqDenied { src_peer_id } - } - handler::Event::InboundCircuitReqDenyFailed { src_peer_id, error } => { - Event::InboundCircuitReqDenyFailed { src_peer_id, error } - } }; self.queued_actions.push_back(ToSwarm::GenerateEvent(event)) @@ -336,7 +301,7 @@ impl NetworkBehaviour for Behaviour { peer_id: relay_peer_id, handler: NotifyHandler::One(*connection_id), event: Either::Left(handler::In::EstablishCircuit { - send_back, + to_dial: send_back, dst_peer_id, }), }, @@ -350,7 +315,7 @@ impl NetworkBehaviour for Behaviour { self.pending_handler_commands.insert( connection_id, handler::In::EstablishCircuit { - send_back, + to_dial: send_back, dst_peer_id, }, ); diff --git a/protocols/relay/src/priv_client/handler.rs b/protocols/relay/src/priv_client/handler.rs index 66bbc5896b1..b3fb345e215 100644 --- a/protocols/relay/src/priv_client/handler.rs +++ b/protocols/relay/src/priv_client/handler.rs @@ -20,14 +20,10 @@ use crate::priv_client::transport; use crate::protocol::{self, inbound_stop, outbound_hop}; -use crate::{proto, HOP_PROTOCOL_NAME, STOP_PROTOCOL_NAME}; +use crate::{priv_client, proto, HOP_PROTOCOL_NAME, STOP_PROTOCOL_NAME}; use either::Either; use futures::channel::{mpsc, oneshot}; -use futures::future::{BoxFuture, FutureExt}; -use futures::sink::SinkExt; -use futures::stream::{FuturesUnordered, StreamExt}; -use futures::TryFutureExt; -use futures_bounded::{PushError, Timeout}; +use futures::future::FutureExt; use futures_timer::Delay; use libp2p_core::multiaddr::Protocol; use libp2p_core::upgrade::ReadyUpgrade; @@ -42,9 +38,9 @@ use libp2p_swarm::{ }; use log::debug; use std::collections::VecDeque; -use std::fmt; use std::task::{Context, Poll}; use std::time::Duration; +use std::{fmt, io}; /// The maximum number of circuits being denied concurrently. /// @@ -61,7 +57,7 @@ pub enum In { }, EstablishCircuit { dst_peer_id: PeerId, - send_back: oneshot::Sender>, + to_dial: oneshot::Sender>, }, } @@ -71,7 +67,7 @@ impl fmt::Debug for In { In::Reserve { to_listener: _ } => f.debug_struct("In::Reserve").finish(), In::EstablishCircuit { dst_peer_id, - send_back: _, + to_dial: _, } => f .debug_struct("In::EstablishCircuit") .field("dst_peer_id", dst_peer_id) @@ -87,40 +83,19 @@ pub enum Event { renewal: bool, limit: Option, }, - ReservationReqFailed { - /// Indicates whether the request replaces an existing reservation. - renewal: bool, - error: StreamUpgradeError, - }, /// An outbound circuit has been established. OutboundCircuitEstablished { limit: Option }, - OutboundCircuitReqFailed { - error: StreamUpgradeError, - }, /// An inbound circuit has been established. InboundCircuitEstablished { src_peer_id: PeerId, limit: Option, }, - /// An inbound circuit request has been denied. - InboundCircuitReqDenied { src_peer_id: PeerId }, - /// Denying an inbound circuit request failed. - InboundCircuitReqDenyFailed { - src_peer_id: PeerId, - error: inbound_stop::UpgradeError, - }, } pub struct Handler { local_peer_id: PeerId, remote_peer_id: PeerId, remote_addr: Multiaddr, - /// A pending fatal error that results in the connection being closed. - pending_error: Option< - StreamUpgradeError< - Either, - >, - >, /// Queue of events to return when polled. queued_events: VecDeque< @@ -132,29 +107,29 @@ pub struct Handler { >, >, - wait_for_outbound_stream: VecDeque, - outbound_circuits: futures_bounded::FuturesSet< - Result< - Either< - Result, - Result, outbound_hop::CircuitFailedReason>, - >, - outbound_hop::FatalUpgradeError, - >, - >, + /// We issue a stream upgrade for each pending request. + pending_requests: VecDeque, - reservation: Reservation, + /// A `RESERVE` request is in-flight for each item in this queue. + active_reserve_requests: VecDeque>, - open_circuit_futs: - futures_bounded::FuturesSet>, + inflight_reserve_requests: + futures_bounded::FuturesSet>, - circuit_deny_futs: futures_bounded::FuturesMap>, + /// A `CONNECT` request is in-flight for each item in this queue. + active_connect_requests: + VecDeque>>, - /// Futures that try to send errors to the transport. - /// - /// We may drop errors if this handler ends up in a terminal state (by returning - /// [`ConnectionHandlerEvent::Close`]). - send_error_futs: FuturesUnordered>, + inflight_outbound_connect_requests: + futures_bounded::FuturesSet>, + + inflight_inbound_circuit_requests: + futures_bounded::FuturesSet>, + + inflight_outbound_circuit_deny_requests: + futures_bounded::FuturesSet>, + + reservation: Reservation, } impl Handler { @@ -164,22 +139,26 @@ impl Handler { remote_peer_id, remote_addr, queued_events: Default::default(), - pending_error: Default::default(), - wait_for_outbound_stream: Default::default(), - outbound_circuits: futures_bounded::FuturesSet::new( + pending_requests: Default::default(), + active_reserve_requests: Default::default(), + inflight_reserve_requests: futures_bounded::FuturesSet::new( STREAM_TIMEOUT, MAX_CONCURRENT_STREAMS_PER_CONNECTION, ), - reservation: Reservation::None, - open_circuit_futs: futures_bounded::FuturesSet::new( + inflight_inbound_circuit_requests: futures_bounded::FuturesSet::new( STREAM_TIMEOUT, MAX_CONCURRENT_STREAMS_PER_CONNECTION, ), - circuit_deny_futs: futures_bounded::FuturesMap::new( + inflight_outbound_connect_requests: futures_bounded::FuturesSet::new( + STREAM_TIMEOUT, + MAX_CONCURRENT_STREAMS_PER_CONNECTION, + ), + inflight_outbound_circuit_deny_requests: futures_bounded::FuturesSet::new( DENYING_CIRCUIT_TIMEOUT, MAX_NUMBER_DENYING_CIRCUIT, ), - send_error_futs: Default::default(), + active_connect_requests: Default::default(), + reservation: Reservation::None, } } @@ -190,64 +169,46 @@ impl Handler { ::OutboundProtocol, >, ) { - let outbound_info = self.wait_for_outbound_stream.pop_front().expect( - "got a stream error without a pending connection command or a reserve listener", - ); - match outbound_info { - outbound_hop::OutboundStreamInfo::Reserve(mut to_listener) => { - let non_fatal_error = match error { - StreamUpgradeError::Timeout => StreamUpgradeError::Timeout, - StreamUpgradeError::NegotiationFailed => StreamUpgradeError::NegotiationFailed, - StreamUpgradeError::Io(e) => { - self.pending_error = Some(StreamUpgradeError::Io(e)); - return; + let pending_request = self + .pending_requests + .pop_front() + .expect("got a stream error without a pending request"); + + match pending_request { + PendingRequest::Reserve { mut to_listener } => { + let error = match error { + StreamUpgradeError::Timeout => { + outbound_hop::ReserveError::Io(io::ErrorKind::TimedOut.into()) } - StreamUpgradeError::Apply(v) => void::unreachable(v), + StreamUpgradeError::Apply(never) => void::unreachable(never), + StreamUpgradeError::NegotiationFailed => { + outbound_hop::ReserveError::Unsupported + } + StreamUpgradeError::Io(e) => outbound_hop::ReserveError::Io(e), }; - if self.pending_error.is_none() { - self.send_error_futs.push( - async move { - let _ = to_listener - .send(transport::ToListenerMsg::Reservation(Err(()))) - .await; - } - .boxed(), - ); - } else { - // Fatal error occurred, thus handler is closing as quickly as possible. - // Transport is notified through dropping `to_listener`. + if let Err(e) = + to_listener.try_send(transport::ToListenerMsg::Reservation(Err(error))) + { + log::debug!("Unable to send error to listener: {}", e.into_send_error()) } - - let renewal = self.reservation.failed(); - - self.queued_events - .push_back(ConnectionHandlerEvent::NotifyBehaviour( - Event::ReservationReqFailed { - renewal, - error: non_fatal_error, - }, - )); + self.reservation.failed(); } - outbound_hop::OutboundStreamInfo::CircuitConnection(cmd) => { - let non_fatal_error = match error { - StreamUpgradeError::Timeout => StreamUpgradeError::Timeout, - StreamUpgradeError::NegotiationFailed => StreamUpgradeError::NegotiationFailed, - StreamUpgradeError::Io(e) => { - self.pending_error = Some(StreamUpgradeError::Io(e)); - return; + PendingRequest::Connect { + to_dial: send_back, .. + } => { + let error = match error { + StreamUpgradeError::Timeout => { + outbound_hop::ConnectError::Io(io::ErrorKind::TimedOut.into()) + } + StreamUpgradeError::NegotiationFailed => { + outbound_hop::ConnectError::Unsupported } + StreamUpgradeError::Io(e) => outbound_hop::ConnectError::Io(e), StreamUpgradeError::Apply(v) => void::unreachable(v), }; - let _ = cmd.send_back.send(Err(())); - - self.queued_events - .push_back(ConnectionHandlerEvent::NotifyBehaviour( - Event::OutboundCircuitReqFailed { - error: non_fatal_error, - }, - )); + let _ = send_back.send(Err(error)); } } } @@ -255,17 +216,14 @@ impl Handler { fn insert_to_deny_futs(&mut self, circuit: inbound_stop::Circuit) { let src_peer_id = circuit.src_peer_id(); - match self.circuit_deny_futs.try_push( - src_peer_id, - circuit.deny(proto::Status::NO_RESERVATION), - ) { - Err(PushError::BeyondCapacity(_)) => log::warn!( - "Dropping inbound circuit request to be denied from {src_peer_id} due to exceeding limit." - ), - Err(PushError::Replaced(_)) => log::warn!( + if self + .inflight_outbound_circuit_deny_requests + .try_push(circuit.deny(proto::Status::NO_RESERVATION)) + .is_err() + { + log::warn!( "Dropping existing inbound circuit request to be denied from {src_peer_id} in favor of new one." - ), - Ok(()) => {} + ) } } } @@ -274,7 +232,7 @@ impl ConnectionHandler for Handler { type FromBehaviour = In; type ToBehaviour = Event; type Error = StreamUpgradeError< - Either, + Either, >; type InboundProtocol = ReadyUpgrade; type InboundOpenInfo = (); @@ -288,22 +246,21 @@ impl ConnectionHandler for Handler { fn on_behaviour_event(&mut self, event: Self::FromBehaviour) { match event { In::Reserve { to_listener } => { - self.wait_for_outbound_stream - .push_back(outbound_hop::OutboundStreamInfo::Reserve(to_listener)); + self.pending_requests + .push_back(PendingRequest::Reserve { to_listener }); self.queued_events .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new(ReadyUpgrade::new(HOP_PROTOCOL_NAME), ()), }); } In::EstablishCircuit { - send_back, + to_dial: send_back, dst_peer_id, } => { - self.wait_for_outbound_stream.push_back( - outbound_hop::OutboundStreamInfo::CircuitConnection( - outbound_hop::Command::new(dst_peer_id, send_back), - ), - ); + self.pending_requests.push_back(PendingRequest::Connect { + dst_peer_id, + to_dial: send_back, + }); self.queued_events .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new(ReadyUpgrade::new(HOP_PROTOCOL_NAME), ()), @@ -327,21 +284,25 @@ impl ConnectionHandler for Handler { Self::Error, >, > { - // Check for a pending (fatal) error. - if let Some(err) = self.pending_error.take() { - // The handler will not be polled again by the `Swarm`. - return Poll::Ready(ConnectionHandlerEvent::Close(err)); - } - - // Inbound circuits loop { - match self.outbound_circuits.poll_unpin(cx) { - Poll::Ready(Ok(Ok(Either::Left(Ok(outbound_hop::Reservation { + debug_assert_eq!( + self.inflight_reserve_requests.len(), + self.active_reserve_requests.len(), + "expect to have one active request per inflight stream" + ); + + // Reservations + match self.inflight_reserve_requests.poll_unpin(cx) { + Poll::Ready(Ok(Ok(outbound_hop::Reservation { renewal_timeout, addrs, limit, - to_listener, - }))))) => { + }))) => { + let to_listener = self + .active_reserve_requests + .pop_front() + .expect("must have active request for stream"); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( self.reservation.accepted( renewal_timeout, @@ -350,57 +311,110 @@ impl ConnectionHandler for Handler { self.local_peer_id, limit, ), - )) - } - Poll::Ready(Ok(Ok(Either::Right(Ok(Some(outbound_hop::Circuit { limit })))))) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::OutboundCircuitEstablished { limit }, )); } - Poll::Ready(Ok(Ok(Either::Right(Ok(None))))) => continue, - Poll::Ready(Ok(Ok(Either::Right(Err(e))))) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::OutboundCircuitReqFailed { - error: StreamUpgradeError::Apply(e), - }, - )); + Poll::Ready(Ok(Err(error))) => { + let mut to_listener = self + .active_reserve_requests + .pop_front() + .expect("must have active request for stream"); + + if let Err(e) = + to_listener.try_send(transport::ToListenerMsg::Reservation(Err(error))) + { + log::debug!("Unable to send error to listener: {}", e.into_send_error()) + } + self.reservation.failed(); + continue; } - Poll::Ready(Ok(Ok(Either::Left(Err(e))))) => { - let renewal = self.reservation.failed(); + Poll::Ready(Err(futures_bounded::Timeout { .. })) => { + let mut to_listener = self + .active_reserve_requests + .pop_front() + .expect("must have active request for stream"); + + if let Err(e) = + to_listener.try_send(transport::ToListenerMsg::Reservation(Err( + outbound_hop::ReserveError::Io(io::ErrorKind::TimedOut.into()), + ))) + { + log::debug!("Unable to send error to listener: {}", e.into_send_error()) + } + self.reservation.failed(); + continue; + } + Poll::Pending => {} + } + + debug_assert_eq!( + self.inflight_outbound_connect_requests.len(), + self.active_connect_requests.len(), + "expect to have one active request per inflight stream" + ); + + // Circuits + match self.inflight_outbound_connect_requests.poll_unpin(cx) { + Poll::Ready(Ok(Ok(outbound_hop::Circuit { + limit, + read_buffer, + stream, + }))) => { + let to_listener = self + .active_connect_requests + .pop_front() + .expect("must have active request for stream"); + + if to_listener + .send(Ok(priv_client::Connection { + state: priv_client::ConnectionState::new_outbound(stream, read_buffer), + })) + .is_err() + { + log::debug!( + "Dropping newly established circuit because the listener is gone" + ); + continue; + } + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::ReservationReqFailed { - renewal, - error: StreamUpgradeError::Apply(e), - }, + Event::OutboundCircuitEstablished { limit }, )); } - Poll::Ready(Ok(Err(e))) => { - return Poll::Ready(ConnectionHandlerEvent::Close(StreamUpgradeError::Apply( - Either::Right(e), - ))) + Poll::Ready(Ok(Err(error))) => { + let to_dialer = self + .active_connect_requests + .pop_front() + .expect("must have active request for stream"); + + let _ = to_dialer.send(Err(error)); + continue; } - Poll::Ready(Err(Timeout { .. })) => { - return Poll::Ready(ConnectionHandlerEvent::Close(StreamUpgradeError::Timeout)); + Poll::Ready(Err(futures_bounded::Timeout { .. })) => { + let mut to_listener = self + .active_reserve_requests + .pop_front() + .expect("must have active request for stream"); + + if let Err(e) = + to_listener.try_send(transport::ToListenerMsg::Reservation(Err( + outbound_hop::ReserveError::Io(io::ErrorKind::TimedOut.into()), + ))) + { + log::debug!("Unable to send error to listener: {}", e.into_send_error()) + } + self.reservation.failed(); + continue; } - Poll::Pending => break, + Poll::Pending => {} } - } - // Return queued events. - if let Some(event) = self.queued_events.pop_front() { - return Poll::Ready(event); - } - - if let Poll::Ready(worker_res) = self.open_circuit_futs.poll_unpin(cx) { - let res = match worker_res { - Ok(r) => r, - Err(Timeout { .. }) => { - return Poll::Ready(ConnectionHandlerEvent::Close(StreamUpgradeError::Timeout)); - } - }; + // Return queued events. + if let Some(event) = self.queued_events.pop_front() { + return Poll::Ready(event); + } - match res { - Ok(circuit) => match &mut self.reservation { + match self.inflight_inbound_circuit_requests.poll_unpin(cx) { + Poll::Ready(Ok(Ok(circuit))) => match &mut self.reservation { Reservation::Accepted { pending_msgs, .. } | Reservation::Renewing { pending_msgs, .. } => { let src_peer_id = circuit.src_peer_id(); @@ -422,47 +436,45 @@ impl ConnectionHandler for Handler { } Reservation::None => { self.insert_to_deny_futs(circuit); + continue; } }, - Err(e) => { - return Poll::Ready(ConnectionHandlerEvent::Close(StreamUpgradeError::Apply( - Either::Left(e), - ))); + Poll::Ready(Ok(Err(e))) => { + log::debug!("An inbound circuit request failed: {e}"); + continue; } + Poll::Ready(Err(e)) => { + log::debug!("An inbound circuit request timed out: {e}"); + continue; + } + Poll::Pending => {} } - } - - if let Poll::Ready(Some(to_listener)) = self.reservation.poll(cx) { - self.wait_for_outbound_stream - .push_back(outbound_hop::OutboundStreamInfo::Reserve(to_listener)); - return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(ReadyUpgrade::new(HOP_PROTOCOL_NAME), ()), - }); - } + if let Poll::Ready(Some(to_listener)) = self.reservation.poll(cx) { + self.pending_requests + .push_back(PendingRequest::Reserve { to_listener }); - // Deny incoming circuit requests. - match self.circuit_deny_futs.poll_unpin(cx) { - Poll::Ready((src_peer_id, Ok(Ok(())))) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::InboundCircuitReqDenied { src_peer_id }, - )); - } - Poll::Ready((src_peer_id, Ok(Err(error)))) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::InboundCircuitReqDenyFailed { src_peer_id, error }, - )); - } - Poll::Ready((src_peer_id, Err(Timeout { .. }))) => { - log::warn!("Dropping inbound circuit request to be denied from {:?} due to exceeding limit.", src_peer_id); + return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(ReadyUpgrade::new(HOP_PROTOCOL_NAME), ()), + }); } - Poll::Pending => {} - } - // Send errors to transport. - while let Poll::Ready(Some(())) = self.send_error_futs.poll_next_unpin(cx) {} + // Deny incoming circuit requests. + match self.inflight_outbound_circuit_deny_requests.poll_unpin(cx) { + Poll::Ready(Ok(Ok(()))) => continue, + Poll::Ready(Ok(Err(error))) => { + log::debug!("Denying inbound circuit failed: {error}"); + continue; + } + Poll::Ready(Err(futures_bounded::Timeout { .. })) => { + log::debug!("Denying inbound circuit timed out"); + continue; + } + Poll::Pending => {} + } - Poll::Pending + return Poll::Pending; + } } fn on_connection_event( @@ -480,7 +492,7 @@ impl ConnectionHandler for Handler { .. }) => { if self - .open_circuit_futs + .inflight_inbound_circuit_requests .try_push(inbound_stop::handle_open_circuit(stream)) .is_err() { @@ -491,33 +503,29 @@ impl ConnectionHandler for Handler { protocol: stream, .. }) => { - let outbound_info = self.wait_for_outbound_stream.pop_front().expect( + let pending_request = self.pending_requests.pop_front().expect( "opened a stream without a pending connection command or a reserve listener", ); - match outbound_info { - outbound_hop::OutboundStreamInfo::Reserve(to_listener) => { + match pending_request { + PendingRequest::Reserve { to_listener } => { + self.active_reserve_requests.push_back(to_listener); if self - .outbound_circuits - .try_push( - outbound_hop::handle_reserve_message_response(stream, to_listener) - .map_ok(Either::Left), - ) + .inflight_reserve_requests + .try_push(outbound_hop::make_reservation(stream)) .is_err() { log::warn!("Dropping outbound stream because we are at capacity") } } - outbound_hop::OutboundStreamInfo::CircuitConnection(cmd) => { + PendingRequest::Connect { + dst_peer_id, + to_dial: send_back, + } => { + self.active_connect_requests.push_back(send_back); + if self - .outbound_circuits - .try_push( - outbound_hop::handle_connection_message_response( - stream, - self.remote_peer_id, - cmd, - ) - .map_ok(Either::Right), - ) + .inflight_outbound_connect_requests + .try_push(outbound_hop::open_circuit(stream, dst_peer_id)) .is_err() { log::warn!("Dropping outbound stream because we are at capacity") @@ -595,17 +603,8 @@ impl Reservation { } /// Marks the current reservation as failed. - /// - /// Returns whether the reservation request was a renewal. - fn failed(&mut self) -> bool { - let renewal = matches!( - self, - Reservation::Accepted { .. } | Reservation::Renewing { .. } - ); - + fn failed(&mut self) { *self = Reservation::None; - - renewal } fn forward_messages_to_transport_listener(&mut self, cx: &mut Context<'_>) { @@ -668,3 +667,15 @@ impl Reservation { poll_val } } + +pub(crate) enum PendingRequest { + Reserve { + /// A channel into the [`Transport`](priv_client::Transport). + to_listener: mpsc::Sender, + }, + Connect { + dst_peer_id: PeerId, + /// A channel into the future returned by [`Transport::dial`](libp2p_core::Transport::dial). + to_dial: oneshot::Sender>, + }, +} diff --git a/protocols/relay/src/priv_client/transport.rs b/protocols/relay/src/priv_client/transport.rs index 41114d0cdd5..c463de9cc66 100644 --- a/protocols/relay/src/priv_client/transport.rs +++ b/protocols/relay/src/priv_client/transport.rs @@ -21,6 +21,8 @@ use crate::multiaddr_ext::MultiaddrExt; use crate::priv_client::Connection; +use crate::protocol::outbound_hop; +use crate::protocol::outbound_hop::{ConnectError, ReserveError}; use crate::RequestId; use futures::channel::mpsc; use futures::channel::oneshot; @@ -97,7 +99,7 @@ pub struct Transport { impl Transport { pub(crate) fn new() -> (Self, mpsc::Receiver) { - let (to_behaviour, from_transport) = mpsc::channel(0); + let (to_behaviour, from_transport) = mpsc::channel(1000); let transport = Transport { to_behaviour, pending_to_behaviour: VecDeque::new(), @@ -189,7 +191,8 @@ impl libp2p_core::Transport for Transport { send_back: tx, }) .await?; - let stream = rx.await?.map_err(|()| Error::Connect)?; + let stream = rx.await??; + Ok(stream) } .boxed()) @@ -381,7 +384,7 @@ impl Stream for Listener { send_back_addr: Protocol::P2p(src_peer_id).into(), }) } - ToListenerMsg::Reservation(Err(())) => self.close(Err(Error::Reservation)), + ToListenerMsg::Reservation(Err(e)) => self.close(Err(Error::Reservation(e))), }; } } @@ -409,9 +412,9 @@ pub enum Error { #[error("One of the provided multiaddresses is malformed.")] MalformedMultiaddr, #[error("Failed to get Reservation.")] - Reservation, + Reservation(#[from] ReserveError), #[error("Failed to connect to destination.")] - Connect, + Connect(#[from] ConnectError), } impl From for TransportError { @@ -431,7 +434,7 @@ pub(crate) enum TransportToBehaviourMsg { relay_peer_id: PeerId, dst_addr: Option, dst_peer_id: PeerId, - send_back: oneshot::Sender>, + send_back: oneshot::Sender>, }, /// Listen for incoming relayed connections via relay node. ListenReq { @@ -443,7 +446,7 @@ pub(crate) enum TransportToBehaviourMsg { #[allow(clippy::large_enum_variant)] pub enum ToListenerMsg { - Reservation(Result), + Reservation(Result), IncomingRelayedConnection { stream: Connection, src_peer_id: PeerId, diff --git a/protocols/relay/src/protocol/inbound_stop.rs b/protocols/relay/src/protocol/inbound_stop.rs index caaeee9cc53..22b8244080f 100644 --- a/protocols/relay/src/protocol/inbound_stop.rs +++ b/protocols/relay/src/protocol/inbound_stop.rs @@ -25,9 +25,10 @@ use bytes::Bytes; use futures::prelude::*; use libp2p_identity::PeerId; use libp2p_swarm::Stream; +use std::io; use thiserror::Error; -pub(crate) async fn handle_open_circuit(io: Stream) -> Result { +pub(crate) async fn handle_open_circuit(io: Stream) -> Result { let mut substream = Framed::new(io, quick_protobuf_codec::Codec::new(MAX_MESSAGE_SIZE)); let proto::StopMessage { @@ -38,40 +39,42 @@ pub(crate) async fn handle_open_circuit(io: Stream) -> Result { - let src_peer_id = PeerId::from_bytes(&peer.ok_or(FatalUpgradeError::MissingPeer)?.id) - .map_err(|_| FatalUpgradeError::ParsePeerId)?; + let src_peer_id = PeerId::from_bytes(&peer.ok_or(ProtocolViolation::MissingPeer)?.id) + .map_err(|_| ProtocolViolation::ParsePeerId)?; Ok(Circuit { substream, src_peer_id, limit: limit.map(Into::into), }) } - proto::StopMessageType::STATUS => Err(FatalUpgradeError::UnexpectedTypeStatus), + proto::StopMessageType::STATUS => { + Err(Error::Protocol(ProtocolViolation::UnexpectedTypeStatus)) + } } } #[derive(Debug, Error)] -pub enum UpgradeError { - #[error("Fatal")] - Fatal(#[from] FatalUpgradeError), +pub(crate) enum Error { + #[error("Protocol error")] + Protocol(#[from] ProtocolViolation), + #[error("IO error")] + Io(#[from] io::Error), } -impl From for UpgradeError { +impl From for Error { fn from(error: quick_protobuf_codec::Error) -> Self { - Self::Fatal(error.into()) + Self::Protocol(ProtocolViolation::Codec(error)) } } #[derive(Debug, Error)] -pub enum FatalUpgradeError { +pub enum ProtocolViolation { #[error(transparent)] Codec(#[from] quick_protobuf_codec::Error), - #[error("Stream closed")] - StreamClosed, #[error("Failed to parse response type field.")] ParseTypeField, #[error("Failed to parse peer id.")] @@ -97,7 +100,7 @@ impl Circuit { self.limit } - pub(crate) async fn accept(mut self) -> Result<(Stream, Bytes), UpgradeError> { + pub(crate) async fn accept(mut self) -> Result<(Stream, Bytes), Error> { let msg = proto::StopMessage { type_pb: proto::StopMessageType::STATUS, peer: None, @@ -121,7 +124,7 @@ impl Circuit { Ok((io, read_buffer.freeze())) } - pub(crate) async fn deny(mut self, status: proto::Status) -> Result<(), UpgradeError> { + pub(crate) async fn deny(mut self, status: proto::Status) -> Result<(), Error> { let msg = proto::StopMessage { type_pb: proto::StopMessageType::STATUS, peer: None, diff --git a/protocols/relay/src/protocol/outbound_hop.rs b/protocols/relay/src/protocol/outbound_hop.rs index 6a222db55c1..4e9b512c3e7 100644 --- a/protocols/relay/src/protocol/outbound_hop.rs +++ b/protocols/relay/src/protocol/outbound_hop.rs @@ -18,25 +18,24 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use std::io; use std::time::{Duration, SystemTime}; use asynchronous_codec::{Framed, FramedParts}; -use futures::channel::{mpsc, oneshot}; +use bytes::Bytes; use futures::prelude::*; use futures_timer::Delay; -use log::debug; use thiserror::Error; use libp2p_core::Multiaddr; use libp2p_identity::PeerId; use libp2p_swarm::Stream; -use crate::priv_client::transport; use crate::protocol::{Limit, MAX_MESSAGE_SIZE}; -use crate::{priv_client, proto}; +use crate::{proto, HOP_PROTOCOL_NAME}; #[derive(Debug, Error)] -pub enum CircuitFailedReason { +pub enum ConnectError { #[error("Remote reported resource limit exceeded.")] ResourceLimitExceeded, #[error("Relay failed to connect to destination.")] @@ -45,22 +44,32 @@ pub enum CircuitFailedReason { NoReservation, #[error("Remote denied permission.")] PermissionDenied, + #[error("Remote does not support the `{HOP_PROTOCOL_NAME}` protocol")] + Unsupported, + #[error("IO error")] + Io(#[source] io::Error), + #[error("Protocol error")] + Protocol(#[from] ProtocolViolation), } #[derive(Debug, Error)] -pub enum ReservationFailedReason { +pub enum ReserveError { #[error("Reservation refused.")] Refused, #[error("Remote reported resource limit exceeded.")] ResourceLimitExceeded, + #[error("Remote does not support the `{HOP_PROTOCOL_NAME}` protocol")] + Unsupported, + #[error("IO error")] + Io(#[source] io::Error), + #[error("Protocol error")] + Protocol(#[from] ProtocolViolation), } #[derive(Debug, Error)] -pub enum FatalUpgradeError { +pub enum ProtocolViolation { #[error(transparent)] Codec(#[from] quick_protobuf_codec::Error), - #[error("Stream closed")] - StreamClosed, #[error("Expected 'status' field to be set.")] MissingStatusField, #[error("Expected 'reservation' field to be set.")] @@ -83,21 +92,31 @@ pub enum FatalUpgradeError { UnexpectedStatus(proto::Status), } +impl From for ConnectError { + fn from(e: quick_protobuf_codec::Error) -> Self { + ConnectError::Protocol(ProtocolViolation::Codec(e)) + } +} + +impl From for ReserveError { + fn from(e: quick_protobuf_codec::Error) -> Self { + ReserveError::Protocol(ProtocolViolation::Codec(e)) + } +} + pub(crate) struct Reservation { pub(crate) renewal_timeout: Delay, pub(crate) addrs: Vec, pub(crate) limit: Option, - pub(crate) to_listener: mpsc::Sender, } pub(crate) struct Circuit { + pub(crate) stream: Stream, + pub(crate) read_buffer: Bytes, pub(crate) limit: Option, } -pub(crate) async fn handle_reserve_message_response( - protocol: Stream, - to_listener: mpsc::Sender, -) -> Result, FatalUpgradeError> { +pub(crate) async fn make_reservation(stream: Stream) -> Result { let msg = proto::HopMessage { type_pb: proto::HopMessageType::RESERVE, peer: None, @@ -105,7 +124,7 @@ pub(crate) async fn handle_reserve_message_response( limit: None, status: None, }; - let mut substream = Framed::new(protocol, quick_protobuf_codec::Codec::new(MAX_MESSAGE_SIZE)); + let mut substream = Framed::new(stream, quick_protobuf_codec::Codec::new(MAX_MESSAGE_SIZE)); substream.send(msg).await?; @@ -118,35 +137,47 @@ pub(crate) async fn handle_reserve_message_response( } = substream .next() .await - .ok_or(FatalUpgradeError::StreamClosed)??; + .ok_or(ReserveError::Io(io::ErrorKind::UnexpectedEof.into()))??; match type_pb { proto::HopMessageType::CONNECT => { - return Err(FatalUpgradeError::UnexpectedTypeConnect); + return Err(ReserveError::Protocol( + ProtocolViolation::UnexpectedTypeConnect, + )); } proto::HopMessageType::RESERVE => { - return Err(FatalUpgradeError::UnexpectedTypeReserve); + return Err(ReserveError::Protocol( + ProtocolViolation::UnexpectedTypeReserve, + )); } proto::HopMessageType::STATUS => {} } let limit = limit.map(Into::into); - match status.ok_or(FatalUpgradeError::MissingStatusField)? { + match status.ok_or(ProtocolViolation::MissingStatusField)? { proto::Status::OK => {} proto::Status::RESERVATION_REFUSED => { - return Ok(Err(ReservationFailedReason::Refused)); + return Err(ReserveError::Refused); } proto::Status::RESOURCE_LIMIT_EXCEEDED => { - return Ok(Err(ReservationFailedReason::ResourceLimitExceeded)); + return Err(ReserveError::ResourceLimitExceeded); + } + s => { + return Err(ReserveError::Protocol(ProtocolViolation::UnexpectedStatus( + s, + ))) } - s => return Err(FatalUpgradeError::UnexpectedStatus(s)), } - let reservation = reservation.ok_or(FatalUpgradeError::MissingReservationField)?; + let reservation = reservation.ok_or(ReserveError::Protocol( + ProtocolViolation::MissingReservationField, + ))?; if reservation.addrs.is_empty() { - return Err(FatalUpgradeError::NoAddressesInReservation); + return Err(ReserveError::Protocol( + ProtocolViolation::NoAddressesInReservation, + )); } let addrs = reservation @@ -154,7 +185,7 @@ pub(crate) async fn handle_reserve_message_response( .into_iter() .map(|b| Multiaddr::try_from(b.to_vec())) .collect::, _>>() - .map_err(|_| FatalUpgradeError::InvalidReservationAddrs)?; + .map_err(|_| ReserveError::Protocol(ProtocolViolation::InvalidReservationAddrs))?; let renewal_timeout = reservation .expire @@ -168,25 +199,25 @@ pub(crate) async fn handle_reserve_message_response( .and_then(|duration| duration.checked_sub(duration / 4)) .map(Duration::from_secs) .map(Delay::new) - .ok_or(FatalUpgradeError::InvalidReservationExpiration)?; + .ok_or(ReserveError::Protocol( + ProtocolViolation::InvalidReservationExpiration, + ))?; - Ok(Ok(Reservation { + Ok(Reservation { renewal_timeout, addrs, limit, - to_listener, - })) + }) } -pub(crate) async fn handle_connection_message_response( +pub(crate) async fn open_circuit( protocol: Stream, - remote_peer_id: PeerId, - con_command: Command, -) -> Result, CircuitFailedReason>, FatalUpgradeError> { + dst_peer_id: PeerId, +) -> Result { let msg = proto::HopMessage { type_pb: proto::HopMessageType::CONNECT, peer: Some(proto::Peer { - id: con_command.dst_peer_id.to_bytes(), + id: dst_peer_id.to_bytes(), addrs: vec![], }), reservation: None, @@ -196,9 +227,7 @@ pub(crate) async fn handle_connection_message_response( let mut substream = Framed::new(protocol, quick_protobuf_codec::Codec::new(MAX_MESSAGE_SIZE)); - if substream.send(msg).await.is_err() { - return Err(FatalUpgradeError::StreamClosed); - } + substream.send(msg).await?; let proto::HopMessage { type_pb, @@ -206,17 +235,21 @@ pub(crate) async fn handle_connection_message_response( reservation: _, limit, status, - } = match substream.next().await { - Some(Ok(r)) => r, - _ => return Err(FatalUpgradeError::StreamClosed), - }; + } = substream + .next() + .await + .ok_or(ConnectError::Io(io::ErrorKind::UnexpectedEof.into()))??; match type_pb { proto::HopMessageType::CONNECT => { - return Err(FatalUpgradeError::UnexpectedTypeConnect); + return Err(ConnectError::Protocol( + ProtocolViolation::UnexpectedTypeConnect, + )); } proto::HopMessageType::RESERVE => { - return Err(FatalUpgradeError::UnexpectedTypeReserve); + return Err(ConnectError::Protocol( + ProtocolViolation::UnexpectedTypeReserve, + )); } proto::HopMessageType::STATUS => {} } @@ -224,22 +257,26 @@ pub(crate) async fn handle_connection_message_response( match status { Some(proto::Status::OK) => {} Some(proto::Status::RESOURCE_LIMIT_EXCEEDED) => { - return Ok(Err(CircuitFailedReason::ResourceLimitExceeded)); + return Err(ConnectError::ResourceLimitExceeded); } Some(proto::Status::CONNECTION_FAILED) => { - return Ok(Err(CircuitFailedReason::ConnectionFailed)); + return Err(ConnectError::ConnectionFailed); } Some(proto::Status::NO_RESERVATION) => { - return Ok(Err(CircuitFailedReason::NoReservation)); + return Err(ConnectError::NoReservation); } Some(proto::Status::PERMISSION_DENIED) => { - return Ok(Err(CircuitFailedReason::PermissionDenied)); + return Err(ConnectError::PermissionDenied); } Some(s) => { - return Err(FatalUpgradeError::UnexpectedStatus(s)); + return Err(ConnectError::Protocol(ProtocolViolation::UnexpectedStatus( + s, + ))); } None => { - return Err(FatalUpgradeError::MissingStatusField); + return Err(ConnectError::Protocol( + ProtocolViolation::MissingStatusField, + )); } } @@ -256,40 +293,11 @@ pub(crate) async fn handle_connection_message_response( "Expect a flushed Framed to have empty write buffer." ); - match con_command.send_back.send(Ok(priv_client::Connection { - state: priv_client::ConnectionState::new_outbound(io, read_buffer.freeze()), - })) { - Ok(()) => Ok(Ok(Some(Circuit { limit }))), - Err(_) => { - debug!( - "Oneshot to `client::transport::Dial` future dropped. \ - Dropping established relayed connection to {:?}.", - remote_peer_id, - ); - - Ok(Ok(None)) - } - } -} - -pub(crate) enum OutboundStreamInfo { - Reserve(mpsc::Sender), - CircuitConnection(Command), -} - -pub(crate) struct Command { - dst_peer_id: PeerId, - pub(crate) send_back: oneshot::Sender>, -} + let circuit = Circuit { + stream: io, + read_buffer: read_buffer.freeze(), + limit, + }; -impl Command { - pub(crate) fn new( - dst_peer_id: PeerId, - send_back: oneshot::Sender>, - ) -> Self { - Self { - dst_peer_id, - send_back, - } - } + Ok(circuit) } diff --git a/protocols/relay/tests/lib.rs b/protocols/relay/tests/lib.rs index 28273c1088b..39fc2b1f6dc 100644 --- a/protocols/relay/tests/lib.rs +++ b/protocols/relay/tests/lib.rs @@ -33,7 +33,10 @@ use libp2p_identity::PeerId; use libp2p_ping as ping; use libp2p_plaintext as plaintext; use libp2p_relay as relay; -use libp2p_swarm::{Config, NetworkBehaviour, Swarm, SwarmEvent}; +use libp2p_swarm::dial_opts::DialOpts; +use libp2p_swarm::{Config, DialError, NetworkBehaviour, Swarm, SwarmEvent}; +use libp2p_swarm_test::SwarmExt; +use std::error::Error; use std::time::Duration; #[test] @@ -271,6 +274,107 @@ fn handle_dial_failure() { assert!(!pool.run_until(wait_for_dial(&mut client, relay_peer_id))); } +#[test] +fn propagate_reservation_error_to_listener() { + let _ = env_logger::try_init(); + let mut pool = LocalPool::new(); + + let relay_addr = Multiaddr::empty().with(Protocol::Memory(rand::random::())); + let mut relay = build_relay_with_config(relay::Config { + max_reservations: 0, // Will make us fail to make the reservation + ..relay::Config::default() + }); + let relay_peer_id = *relay.local_peer_id(); + + relay.listen_on(relay_addr.clone()).unwrap(); + relay.add_external_address(relay_addr.clone()); + spawn_swarm_on_pool(&pool, relay); + + let client_addr = relay_addr + .with(Protocol::P2p(relay_peer_id)) + .with(Protocol::P2pCircuit); + let mut client = build_client(); + + let reservation_listener = client.listen_on(client_addr.clone()).unwrap(); + + // Wait for connection to relay. + assert!(pool.run_until(wait_for_dial(&mut client, relay_peer_id))); + + let error = pool.run_until(client.wait(|e| match e { + SwarmEvent::ListenerClosed { + listener_id, + reason: Err(e), + .. + } if listener_id == reservation_listener => Some(e), + _ => None, + })); + + let error = error + .source() + .unwrap() + .downcast_ref::() + .unwrap(); + + assert!(matches!( + error, + relay::outbound::hop::ReserveError::ResourceLimitExceeded + )); +} + +#[test] +fn propagate_connect_error_to_unknown_peer_to_dialer() { + let _ = env_logger::try_init(); + let mut pool = LocalPool::new(); + + let relay_addr = Multiaddr::empty().with(Protocol::Memory(rand::random::())); + let mut relay = build_relay(); + let relay_peer_id = *relay.local_peer_id(); + + relay.listen_on(relay_addr.clone()).unwrap(); + relay.add_external_address(relay_addr.clone()); + spawn_swarm_on_pool(&pool, relay); + + let mut src = build_client(); + + let dst_peer_id = PeerId::random(); // We don't have a destination peer in this test, so the CONNECT request will fail. + let dst_addr = relay_addr + .with(Protocol::P2p(relay_peer_id)) + .with(Protocol::P2pCircuit) + .with(Protocol::P2p(dst_peer_id)); + + let opts = DialOpts::from(dst_addr.clone()); + let circuit_connection_id = opts.connection_id(); + + src.dial(opts).unwrap(); + + let (failed_address, error) = pool.run_until(src.wait(|e| match e { + SwarmEvent::OutgoingConnectionError { + connection_id, + error: DialError::Transport(mut errors), + .. + } if connection_id == circuit_connection_id => { + assert_eq!(errors.len(), 1); + Some(errors.remove(0)) + } + _ => None, + })); + + // This is a bit wonky but we need to get the _actual_ source error :) + let error = error + .source() + .unwrap() + .source() + .unwrap() + .downcast_ref::() + .unwrap(); + + assert_eq!(failed_address, dst_addr); + assert!(matches!( + error, + relay::outbound::hop::ConnectError::NoReservation + )); +} + #[test] fn reuse_connection() { let _ = env_logger::try_init(); @@ -309,6 +413,13 @@ fn reuse_connection() { } fn build_relay() -> Swarm { + build_relay_with_config(relay::Config { + reservation_duration: Duration::from_secs(2), + ..Default::default() + }) +} + +fn build_relay_with_config(config: relay::Config) -> Swarm { let local_key = identity::Keypair::generate_ed25519(); let local_peer_id = local_key.public().to_peer_id(); @@ -318,13 +429,7 @@ fn build_relay() -> Swarm { transport, Relay { ping: ping::Behaviour::new(ping::Config::new()), - relay: relay::Behaviour::new( - local_peer_id, - relay::Config { - reservation_duration: Duration::from_secs(2), - ..Default::default() - }, - ), + relay: relay::Behaviour::new(local_peer_id, config), }, local_peer_id, Config::with_async_std_executor(),