From 823d0b2b7551fb69bb0a62cec11a51ebd3613928 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 1 Nov 2023 12:51:44 +1100 Subject: [PATCH] feat(relay): don't close connections upon errors in relay server To remove the usages of `ConnectionHandlerEvent::Close` from the relay-server, we unify what used to be called `CircuitFailedReason` and `FatalUpgradeError`. Whilst the errors may be fatal for the particular circuit, they are not necessarily fatal for the entire connection. Related: #3591. Resolves: #4716. Pull-Request: #4718. --- misc/metrics/src/relay.rs | 5 + protocols/relay/CHANGELOG.md | 7 + protocols/relay/src/behaviour.rs | 32 +- protocols/relay/src/behaviour/handler.rs | 293 ++++++++++-------- protocols/relay/src/lib.rs | 7 +- protocols/relay/src/priv_client/handler.rs | 5 +- protocols/relay/src/protocol/inbound_hop.rs | 36 +-- protocols/relay/src/protocol/inbound_stop.rs | 10 +- protocols/relay/src/protocol/outbound_hop.rs | 4 - protocols/relay/src/protocol/outbound_stop.rs | 167 ++++------ 10 files changed, 281 insertions(+), 285 deletions(-) diff --git a/misc/metrics/src/relay.rs b/misc/metrics/src/relay.rs index 9ba692721e9..607daf3f1e1 100644 --- a/misc/metrics/src/relay.rs +++ b/misc/metrics/src/relay.rs @@ -66,20 +66,25 @@ impl From<&libp2p_relay::Event> for EventType { fn from(event: &libp2p_relay::Event) -> Self { match event { libp2p_relay::Event::ReservationReqAccepted { .. } => EventType::ReservationReqAccepted, + #[allow(deprecated)] libp2p_relay::Event::ReservationReqAcceptFailed { .. } => { EventType::ReservationReqAcceptFailed } libp2p_relay::Event::ReservationReqDenied { .. } => EventType::ReservationReqDenied, + #[allow(deprecated)] libp2p_relay::Event::ReservationReqDenyFailed { .. } => { EventType::ReservationReqDenyFailed } libp2p_relay::Event::ReservationTimedOut { .. } => EventType::ReservationTimedOut, libp2p_relay::Event::CircuitReqDenied { .. } => EventType::CircuitReqDenied, + #[allow(deprecated)] libp2p_relay::Event::CircuitReqOutboundConnectFailed { .. } => { EventType::CircuitReqOutboundConnectFailed } + #[allow(deprecated)] libp2p_relay::Event::CircuitReqDenyFailed { .. } => EventType::CircuitReqDenyFailed, libp2p_relay::Event::CircuitReqAccepted { .. } => EventType::CircuitReqAccepted, + #[allow(deprecated)] libp2p_relay::Event::CircuitReqAcceptFailed { .. } => EventType::CircuitReqAcceptFailed, libp2p_relay::Event::CircuitClosed { .. } => EventType::CircuitClosed, } diff --git a/protocols/relay/CHANGELOG.md b/protocols/relay/CHANGELOG.md index 20d8370cf6d..200cc4bc18d 100644 --- a/protocols/relay/CHANGELOG.md +++ b/protocols/relay/CHANGELOG.md @@ -1,5 +1,12 @@ ## 0.17.0 - unreleased +- Don't close connections on protocol failures within the relay-server. + To achieve this, error handling was restructured: + - `libp2p::relay::outbound::stop::FatalUpgradeError` has been removed. + - `libp2p::relay::outbound::stop::{Error, ProtocolViolation}` have been introduced. + - Several variants of `libp2p::relay::Event` have been deprecated. + + See [PR 4718](https://github.com/libp2p/rust-libp2p/pull/4718). - Fix a rare race condition when making a reservation on a relay that could lead to a failed reservation. See [PR 4747](https://github.com/libp2p/rust-lib2pp/pulls/4747). - Propagate errors of relay client to the listener / dialer. diff --git a/protocols/relay/src/behaviour.rs b/protocols/relay/src/behaviour.rs index 256bb463b5a..5b9f1fe5843 100644 --- a/protocols/relay/src/behaviour.rs +++ b/protocols/relay/src/behaviour.rs @@ -34,7 +34,7 @@ use libp2p_identity::PeerId; use libp2p_swarm::behaviour::{ConnectionClosed, FromSwarm}; use libp2p_swarm::{ dummy, ConnectionDenied, ConnectionId, ExternalAddresses, NetworkBehaviour, NotifyHandler, - StreamUpgradeError, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, + THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }; use std::collections::{hash_map, HashMap, HashSet, VecDeque}; use std::num::NonZeroU32; @@ -169,16 +169,22 @@ pub enum Event { renewed: bool, }, /// Accepting an inbound reservation request failed. + #[deprecated( + note = "Will be removed in favor of logging them internally, see for details." + )] ReservationReqAcceptFailed { src_peer_id: PeerId, - error: inbound_hop::UpgradeError, + error: inbound_hop::Error, }, /// An inbound reservation request has been denied. ReservationReqDenied { src_peer_id: PeerId }, /// Denying an inbound reservation request has failed. + #[deprecated( + note = "Will be removed in favor of logging them internally, see for details." + )] ReservationReqDenyFailed { src_peer_id: PeerId, - error: inbound_hop::UpgradeError, + error: inbound_hop::Error, }, /// An inbound reservation has timed out. ReservationTimedOut { src_peer_id: PeerId }, @@ -188,10 +194,13 @@ pub enum Event { dst_peer_id: PeerId, }, /// Denying an inbound circuit request failed. + #[deprecated( + note = "Will be removed in favor of logging them internally, see for details." + )] CircuitReqDenyFailed { src_peer_id: PeerId, dst_peer_id: PeerId, - error: inbound_hop::UpgradeError, + error: inbound_hop::Error, }, /// An inbound cirucit request has been accepted. CircuitReqAccepted { @@ -199,16 +208,22 @@ pub enum Event { dst_peer_id: PeerId, }, /// An outbound connect for an inbound cirucit request failed. + #[deprecated( + note = "Will be removed in favor of logging them internally, see for details." + )] CircuitReqOutboundConnectFailed { src_peer_id: PeerId, dst_peer_id: PeerId, - error: StreamUpgradeError, + error: outbound_stop::Error, }, /// Accepting an inbound circuit request failed. + #[deprecated( + note = "Will be removed in favor of logging them internally, see for details." + )] CircuitReqAcceptFailed { src_peer_id: PeerId, dst_peer_id: PeerId, - error: inbound_hop::UpgradeError, + error: inbound_hop::Error, }, /// An inbound circuit has closed. CircuitClosed { @@ -455,6 +470,7 @@ impl NetworkBehaviour for Behaviour { )); } handler::Event::ReservationReqAcceptFailed { error } => { + #[allow(deprecated)] self.queued_actions.push_back(ToSwarm::GenerateEvent( Event::ReservationReqAcceptFailed { src_peer_id: event_source, @@ -470,6 +486,7 @@ impl NetworkBehaviour for Behaviour { )); } handler::Event::ReservationReqDenyFailed { error } => { + #[allow(deprecated)] self.queued_actions.push_back(ToSwarm::GenerateEvent( Event::ReservationReqDenyFailed { src_peer_id: event_source, @@ -592,6 +609,7 @@ impl NetworkBehaviour for Behaviour { self.circuits.remove(circuit_id); } + #[allow(deprecated)] self.queued_actions.push_back(ToSwarm::GenerateEvent( Event::CircuitReqDenyFailed { src_peer_id: event_source, @@ -637,6 +655,7 @@ impl NetworkBehaviour for Behaviour { status, }), }); + #[allow(deprecated)] self.queued_actions.push_back(ToSwarm::GenerateEvent( Event::CircuitReqOutboundConnectFailed { src_peer_id, @@ -662,6 +681,7 @@ impl NetworkBehaviour for Behaviour { error, } => { self.circuits.remove(circuit_id); + #[allow(deprecated)] self.queued_actions.push_back(ToSwarm::GenerateEvent( Event::CircuitReqAcceptFailed { src_peer_id: event_source, diff --git a/protocols/relay/src/behaviour/handler.rs b/protocols/relay/src/behaviour/handler.rs index 60997a107e6..a2ba268392f 100644 --- a/protocols/relay/src/behaviour/handler.rs +++ b/protocols/relay/src/behaviour/handler.rs @@ -39,10 +39,10 @@ use libp2p_swarm::{ ConnectionHandler, ConnectionHandlerEvent, ConnectionId, Stream, StreamProtocol, StreamUpgradeError, SubstreamProtocol, }; -use std::collections::VecDeque; -use std::fmt; +use std::collections::{HashMap, VecDeque}; use std::task::{Context, Poll}; use std::time::Duration; +use std::{fmt, io}; const MAX_CONCURRENT_STREAMS_PER_CONNECTION: usize = 10; const STREAM_TIMEOUT: Duration = Duration::from_secs(60); @@ -151,11 +151,11 @@ pub enum Event { renewed: bool, }, /// Accepting an inbound reservation request failed. - ReservationReqAcceptFailed { error: inbound_hop::UpgradeError }, + ReservationReqAcceptFailed { error: inbound_hop::Error }, /// An inbound reservation request has been denied. ReservationReqDenied {}, /// Denying an inbound reservation request has failed. - ReservationReqDenyFailed { error: inbound_hop::UpgradeError }, + ReservationReqDenyFailed { error: inbound_hop::Error }, /// An inbound reservation has timed out. ReservationTimedOut {}, /// An inbound circuit request has been received. @@ -172,7 +172,7 @@ pub enum Event { CircuitReqDenyFailed { circuit_id: Option, dst_peer_id: PeerId, - error: inbound_hop::UpgradeError, + error: inbound_hop::Error, }, /// An inbound circuit request has been accepted. CircuitReqAccepted { @@ -183,7 +183,7 @@ pub enum Event { CircuitReqAcceptFailed { circuit_id: CircuitId, dst_peer_id: PeerId, - error: inbound_hop::UpgradeError, + error: inbound_hop::Error, }, /// An outbound substream for an inbound circuit request has been /// negotiated. @@ -202,7 +202,7 @@ pub enum Event { src_connection_id: ConnectionId, inbound_circuit_req: inbound_hop::CircuitReq, status: proto::Status, - error: StreamUpgradeError, + error: outbound_stop::Error, }, /// An inbound circuit has closed. CircuitClosed { @@ -343,13 +343,6 @@ pub struct Handler { >, >, - /// A pending fatal error that results in the connection being closed. - pending_error: Option< - StreamUpgradeError< - Either, - >, - >, - /// The point in time when this connection started idleing. idle_at: Option, @@ -359,44 +352,41 @@ pub struct Handler { active_reservation: Option, /// Futures accepting an inbound circuit request. - circuit_accept_futures: - Futures>, + circuit_accept_futures: Futures>, /// Futures denying an inbound circuit request. - circuit_deny_futures: Futures<( - Option, - PeerId, - Result<(), inbound_hop::UpgradeError>, - )>, + circuit_deny_futures: Futures<(Option, PeerId, Result<(), inbound_hop::Error>)>, /// Futures relaying data for circuit between two peers. circuits: Futures<(CircuitId, PeerId, Result<(), std::io::Error>)>, - pending_connect_requests: VecDeque, - - workers: futures_bounded::FuturesSet< - Either< - Result< - Either, - inbound_hop::FatalUpgradeError, - >, - Result< - Result, - outbound_stop::FatalUpgradeError, - >, - >, + /// We issue a stream upgrade for each [`PendingConnect`] request. + pending_connect_requests: VecDeque, + + /// A `CONNECT` request is in flight for these circuits. + active_connect_requests: HashMap, + + inbound_workers: futures_bounded::FuturesSet< + Result, inbound_hop::Error>, + >, + outbound_workers: futures_bounded::FuturesMap< + CircuitId, + Result, >, } impl Handler { pub fn new(config: Config, endpoint: ConnectedPoint) -> Handler { Handler { - workers: futures_bounded::FuturesSet::new( + inbound_workers: futures_bounded::FuturesSet::new( + STREAM_TIMEOUT, + MAX_CONCURRENT_STREAMS_PER_CONNECTION, + ), + outbound_workers: futures_bounded::FuturesMap::new( STREAM_TIMEOUT, MAX_CONCURRENT_STREAMS_PER_CONNECTION, ), endpoint, config, queued_events: Default::default(), - pending_error: Default::default(), idle_at: None, reservation_request_future: Default::default(), circuit_accept_futures: Default::default(), @@ -404,21 +394,19 @@ impl Handler { circuits: Default::default(), active_reservation: Default::default(), pending_connect_requests: Default::default(), + active_connect_requests: Default::default(), } } fn on_fully_negotiated_inbound(&mut self, stream: Stream) { if self - .workers - .try_push( - inbound_hop::handle_inbound_request( - stream, - self.config.reservation_duration, - self.config.max_circuit_duration, - self.config.max_circuit_bytes, - ) - .map(Either::Left), - ) + .inbound_workers + .try_push(inbound_hop::handle_inbound_request( + stream, + self.config.reservation_duration, + self.config.max_circuit_duration, + self.config.max_circuit_bytes, + )) .is_err() { log::warn!("Dropping inbound stream because we are at capacity") @@ -426,18 +414,29 @@ impl Handler { } fn on_fully_negotiated_outbound(&mut self, stream: Stream) { - let stop_command = self + let connect = self .pending_connect_requests .pop_front() .expect("opened a stream without a pending stop command"); if self - .workers - .try_push(outbound_stop::connect(stream, stop_command).map(Either::Right)) + .outbound_workers + .try_push( + connect.circuit_id, + outbound_stop::connect( + stream, + connect.src_peer_id, + connect.max_circuit_duration, + connect.max_circuit_bytes, + ), + ) .is_err() { log::warn!("Dropping outbound stream because we are at capacity") } + + self.active_connect_requests + .insert(connect.circuit_id, connect); } fn on_dial_upgrade_error( @@ -447,21 +446,10 @@ impl Handler { ::OutboundProtocol, >, ) { - let (non_fatal_error, status) = match error { - StreamUpgradeError::Timeout => ( - StreamUpgradeError::Timeout, - proto::Status::CONNECTION_FAILED, - ), - StreamUpgradeError::NegotiationFailed => { - // The remote has previously done a reservation. Doing a reservation but not - // supporting the stop protocol is pointless, thus disconnecting. - self.pending_error = Some(StreamUpgradeError::NegotiationFailed); - return; - } - StreamUpgradeError::Io(e) => { - self.pending_error = Some(StreamUpgradeError::Io(e)); - return; - } + let error = match error { + StreamUpgradeError::Timeout => outbound_stop::Error::Io(io::ErrorKind::TimedOut.into()), + StreamUpgradeError::NegotiationFailed => outbound_stop::Error::Unsupported, + StreamUpgradeError::Io(e) => outbound_stop::Error::Io(e), StreamUpgradeError::Apply(v) => void::unreachable(v), }; @@ -477,16 +465,16 @@ impl Handler { src_peer_id: stop_command.src_peer_id, src_connection_id: stop_command.src_connection_id, inbound_circuit_req: stop_command.inbound_circuit_req, - status, - error: non_fatal_error, + status: proto::Status::CONNECTION_FAILED, + error, }, )); } } enum ReservationRequestFuture { - Accepting(BoxFuture<'static, Result<(), inbound_hop::UpgradeError>>), - Denying(BoxFuture<'static, Result<(), inbound_hop::UpgradeError>>), + Accepting(BoxFuture<'static, Result<(), inbound_hop::Error>>), + Denying(BoxFuture<'static, Result<(), inbound_hop::Error>>), } type Futures = FuturesUnordered>; @@ -494,9 +482,7 @@ type Futures = FuturesUnordered>; impl ConnectionHandler for Handler { type FromBehaviour = In; type ToBehaviour = Event; - type Error = StreamUpgradeError< - Either, - >; + type Error = void::Void; type InboundProtocol = ReadyUpgrade; type InboundOpenInfo = (); type OutboundProtocol = ReadyUpgrade; @@ -542,14 +528,13 @@ impl ConnectionHandler for Handler { src_peer_id, src_connection_id, } => { - self.pending_connect_requests - .push_back(outbound_stop::PendingConnect::new( - circuit_id, - inbound_circuit_req, - src_peer_id, - src_connection_id, - &self.config, - )); + self.pending_connect_requests.push_back(PendingConnect::new( + circuit_id, + inbound_circuit_req, + src_peer_id, + src_connection_id, + &self.config, + )); self.queued_events .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new(ReadyUpgrade::new(STOP_PROTOCOL_NAME), ()), @@ -614,12 +599,6 @@ impl ConnectionHandler for Handler { Self::Error, >, > { - // Check for a pending (fatal) error. - if let Some(err) = self.pending_error.take() { - // The handler will not be polled again by the `Swarm`. - return Poll::Ready(ConnectionHandlerEvent::Close(err)); - } - // Return queued events. if let Some(event) = self.queued_events.pop_front() { return Poll::Ready(event); @@ -651,61 +630,92 @@ impl ConnectionHandler for Handler { } } - // Process protocol requests - match self.workers.poll_unpin(cx) { - Poll::Ready(Ok(Either::Left(Ok(Either::Left(inbound_reservation_req))))) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::ReservationReqReceived { - inbound_reservation_req, - endpoint: self.endpoint.clone(), - renewed: self.active_reservation.is_some(), - }, - )); - } - Poll::Ready(Ok(Either::Left(Ok(Either::Right(inbound_circuit_req))))) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::CircuitReqReceived { - inbound_circuit_req, - endpoint: self.endpoint.clone(), - }, - )); + // Process inbound protocol workers + loop { + match self.inbound_workers.poll_unpin(cx) { + Poll::Ready(Ok(Ok(Either::Left(inbound_reservation_req)))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::ReservationReqReceived { + inbound_reservation_req, + endpoint: self.endpoint.clone(), + renewed: self.active_reservation.is_some(), + }, + )); + } + Poll::Ready(Ok(Ok(Either::Right(inbound_circuit_req)))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::CircuitReqReceived { + inbound_circuit_req, + endpoint: self.endpoint.clone(), + }, + )); + } + Poll::Ready(Err(e)) => { + log::debug!("Inbound stream operation timed out: {e}"); + continue; + } + Poll::Ready(Ok(Err(e))) => { + log::debug!("Inbound stream operation failed: {e}"); + continue; + } + Poll::Pending => { + break; + } } - Poll::Ready(Ok(Either::Right(Ok(Ok(circuit))))) => { + } + + // Process outbound protocol workers + match self.outbound_workers.poll_unpin(cx) { + Poll::Ready((id, Ok(Ok(circuit)))) => { + let connect = self + .active_connect_requests + .remove(&id) + .expect("must have pending connect"); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( Event::OutboundConnectNegotiated { - circuit_id: circuit.circuit_id, - src_peer_id: circuit.src_peer_id, - src_connection_id: circuit.src_connection_id, - inbound_circuit_req: circuit.inbound_circuit_req, + circuit_id: id, + src_peer_id: connect.src_peer_id, + src_connection_id: connect.src_connection_id, + inbound_circuit_req: connect.inbound_circuit_req, dst_stream: circuit.dst_stream, dst_pending_data: circuit.dst_pending_data, }, )); } - Poll::Ready(Ok(Either::Right(Ok(Err(circuit_failed))))) => { + Poll::Ready((id, Ok(Err(error)))) => { + let connect = self + .active_connect_requests + .remove(&id) + .expect("must have pending connect"); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( Event::OutboundConnectNegotiationFailed { - circuit_id: circuit_failed.circuit_id, - src_peer_id: circuit_failed.src_peer_id, - src_connection_id: circuit_failed.src_connection_id, - inbound_circuit_req: circuit_failed.inbound_circuit_req, - status: circuit_failed.status, - error: circuit_failed.error, + circuit_id: connect.circuit_id, + src_peer_id: connect.src_peer_id, + src_connection_id: connect.src_connection_id, + inbound_circuit_req: connect.inbound_circuit_req, + status: error.to_status(), + error, }, )); } - Poll::Ready(Err(futures_bounded::Timeout { .. })) => { - return Poll::Ready(ConnectionHandlerEvent::Close(StreamUpgradeError::Timeout)); - } - Poll::Ready(Ok(Either::Left(Err(e)))) => { - return Poll::Ready(ConnectionHandlerEvent::Close(StreamUpgradeError::Apply( - Either::Left(e), - ))); - } - Poll::Ready(Ok(Either::Right(Err(e)))) => { - return Poll::Ready(ConnectionHandlerEvent::Close(StreamUpgradeError::Apply( - Either::Right(e), - ))); + Poll::Ready((id, Err(futures_bounded::Timeout { .. }))) => { + let connect = self + .active_connect_requests + .remove(&id) + .expect("must have pending connect"); + + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::OutboundConnectNegotiationFailed { + circuit_id: connect.circuit_id, + src_peer_id: connect.src_peer_id, + src_connection_id: connect.src_connection_id, + inbound_circuit_req: connect.inbound_circuit_req, + status: proto::Status::CONNECTION_FAILED, // Best fit? + error: outbound_stop::Error::Io(io::ErrorKind::TimedOut.into()), + }, + )); } Poll::Pending => {} } @@ -903,3 +913,32 @@ struct CircuitParts { dst_stream: Stream, dst_pending_data: Bytes, } + +/// Holds everything we know about a to-be-issued `CONNECT` request to a peer. +struct PendingConnect { + circuit_id: CircuitId, + inbound_circuit_req: inbound_hop::CircuitReq, + src_peer_id: PeerId, + src_connection_id: ConnectionId, + max_circuit_duration: Duration, + max_circuit_bytes: u64, +} + +impl PendingConnect { + fn new( + circuit_id: CircuitId, + inbound_circuit_req: inbound_hop::CircuitReq, + src_peer_id: PeerId, + src_connection_id: ConnectionId, + config: &Config, + ) -> Self { + Self { + circuit_id, + inbound_circuit_req, + src_peer_id, + src_connection_id, + max_circuit_duration: config.max_circuit_duration, + max_circuit_bytes: config.max_circuit_bytes, + } + } +} diff --git a/protocols/relay/src/lib.rs b/protocols/relay/src/lib.rs index 09d326be9fb..eca3578d599 100644 --- a/protocols/relay/src/lib.rs +++ b/protocols/relay/src/lib.rs @@ -45,7 +45,10 @@ pub use protocol::{HOP_PROTOCOL_NAME, STOP_PROTOCOL_NAME}; /// Types related to the relay protocol inbound. pub mod inbound { pub mod hop { - pub use crate::protocol::inbound_hop::FatalUpgradeError; + #[deprecated(note = "Renamed to `Error`.")] + pub type FatalUpgradeError = Error; + + pub use crate::protocol::inbound_hop::Error; } } @@ -55,7 +58,7 @@ pub mod outbound { pub use crate::protocol::outbound_hop::{ConnectError, ProtocolViolation, ReserveError}; } pub mod stop { - pub use crate::protocol::outbound_stop::FatalUpgradeError; + pub use crate::protocol::outbound_stop::{Error, ProtocolViolation}; } } diff --git a/protocols/relay/src/priv_client/handler.rs b/protocols/relay/src/priv_client/handler.rs index b3fb345e215..3e79b60ef97 100644 --- a/protocols/relay/src/priv_client/handler.rs +++ b/protocols/relay/src/priv_client/handler.rs @@ -21,7 +21,6 @@ use crate::priv_client::transport; use crate::protocol::{self, inbound_stop, outbound_hop}; use crate::{priv_client, proto, HOP_PROTOCOL_NAME, STOP_PROTOCOL_NAME}; -use either::Either; use futures::channel::{mpsc, oneshot}; use futures::future::FutureExt; use futures_timer::Delay; @@ -231,9 +230,7 @@ impl Handler { impl ConnectionHandler for Handler { type FromBehaviour = In; type ToBehaviour = Event; - type Error = StreamUpgradeError< - Either, - >; + type Error = void::Void; type InboundProtocol = ReadyUpgrade; type InboundOpenInfo = (); type OutboundProtocol = ReadyUpgrade; diff --git a/protocols/relay/src/protocol/inbound_hop.rs b/protocols/relay/src/protocol/inbound_hop.rs index b44d29e42ce..69ec495261f 100644 --- a/protocols/relay/src/protocol/inbound_hop.rs +++ b/protocols/relay/src/protocol/inbound_hop.rs @@ -35,25 +35,11 @@ use crate::proto::message_v2::pb::mod_HopMessage::Type; use crate::protocol::MAX_MESSAGE_SIZE; #[derive(Debug, Error)] -pub enum UpgradeError { - #[error("Fatal")] - Fatal(#[from] FatalUpgradeError), -} - -impl From for UpgradeError { - fn from(error: quick_protobuf_codec::Error) -> Self { - Self::Fatal(error.into()) - } -} - -#[derive(Debug, Error)] -pub enum FatalUpgradeError { +pub enum Error { #[error(transparent)] Codec(#[from] quick_protobuf_codec::Error), #[error("Stream closed")] StreamClosed, - #[error("Failed to parse response type field.")] - ParseTypeField, #[error("Failed to parse peer id.")] ParsePeerId, #[error("Expected 'peer' field to be set.")] @@ -70,7 +56,7 @@ pub struct ReservationReq { } impl ReservationReq { - pub async fn accept(self, addrs: Vec) -> Result<(), FatalUpgradeError> { + pub async fn accept(self, addrs: Vec) -> Result<(), Error> { if addrs.is_empty() { log::debug!( "Accepting relay reservation without providing external addresses of local node. \ @@ -104,7 +90,7 @@ impl ReservationReq { self.send(msg).await } - pub async fn deny(self, status: proto::Status) -> Result<(), FatalUpgradeError> { + pub async fn deny(self, status: proto::Status) -> Result<(), Error> { let msg = proto::HopMessage { type_pb: proto::HopMessageType::STATUS, peer: None, @@ -116,7 +102,7 @@ impl ReservationReq { self.send(msg).await } - async fn send(mut self, msg: proto::HopMessage) -> Result<(), FatalUpgradeError> { + async fn send(mut self, msg: proto::HopMessage) -> Result<(), Error> { self.substream.send(msg).await?; self.substream.flush().await?; self.substream.close().await?; @@ -135,7 +121,7 @@ impl CircuitReq { self.dst } - pub async fn accept(mut self) -> Result<(Stream, Bytes), FatalUpgradeError> { + pub async fn accept(mut self) -> Result<(Stream, Bytes), Error> { let msg = proto::HopMessage { type_pb: proto::HopMessageType::STATUS, peer: None, @@ -160,7 +146,7 @@ impl CircuitReq { Ok((io, read_buffer.freeze())) } - pub async fn deny(mut self, status: proto::Status) -> Result<(), FatalUpgradeError> { + pub async fn deny(mut self, status: proto::Status) -> Result<(), Error> { let msg = proto::HopMessage { type_pb: proto::HopMessageType::STATUS, peer: None, @@ -185,13 +171,13 @@ pub(crate) async fn handle_inbound_request( reservation_duration: Duration, max_circuit_duration: Duration, max_circuit_bytes: u64, -) -> Result, FatalUpgradeError> { +) -> Result, Error> { let mut substream = Framed::new(io, quick_protobuf_codec::Codec::new(MAX_MESSAGE_SIZE)); let res = substream.next().await; if let None | Some(Err(_)) = res { - return Err(FatalUpgradeError::StreamClosed); + return Err(Error::StreamClosed); } let proto::HopMessage { @@ -212,17 +198,17 @@ pub(crate) async fn handle_inbound_request( Type::CONNECT => { let peer_id_res = match peer { Some(r) => PeerId::from_bytes(&r.id), - None => return Err(FatalUpgradeError::MissingPeer), + None => return Err(Error::MissingPeer), }; let dst = match peer_id_res { Ok(res) => res, - Err(_) => return Err(FatalUpgradeError::ParsePeerId), + Err(_) => return Err(Error::ParsePeerId), }; Either::Right(CircuitReq { dst, substream }) } - Type::STATUS => return Err(FatalUpgradeError::UnexpectedTypeStatus), + Type::STATUS => return Err(Error::UnexpectedTypeStatus), }; Ok(req) diff --git a/protocols/relay/src/protocol/inbound_stop.rs b/protocols/relay/src/protocol/inbound_stop.rs index 22b8244080f..b698a5ff769 100644 --- a/protocols/relay/src/protocol/inbound_stop.rs +++ b/protocols/relay/src/protocol/inbound_stop.rs @@ -72,11 +72,9 @@ impl From for Error { } #[derive(Debug, Error)] -pub enum ProtocolViolation { +pub(crate) enum ProtocolViolation { #[error(transparent)] Codec(#[from] quick_protobuf_codec::Error), - #[error("Failed to parse response type field.")] - ParseTypeField, #[error("Failed to parse peer id.")] ParsePeerId, #[error("Expected 'peer' field to be set.")] @@ -132,10 +130,12 @@ impl Circuit { status: Some(status), }; - self.send(msg).await.map_err(Into::into) + self.send(msg).await?; + + Ok(()) } - async fn send(&mut self, msg: proto::StopMessage) -> Result<(), quick_protobuf_codec::Error> { + async fn send(&mut self, msg: proto::StopMessage) -> Result<(), Error> { self.substream.send(msg).await?; self.substream.flush().await?; diff --git a/protocols/relay/src/protocol/outbound_hop.rs b/protocols/relay/src/protocol/outbound_hop.rs index 4e9b512c3e7..2a39ec5fd4a 100644 --- a/protocols/relay/src/protocol/outbound_hop.rs +++ b/protocols/relay/src/protocol/outbound_hop.rs @@ -80,14 +80,10 @@ pub enum ProtocolViolation { InvalidReservationExpiration, #[error("Invalid addresses in reservation.")] InvalidReservationAddrs, - #[error("Failed to parse response type field.")] - ParseTypeField, #[error("Unexpected message type 'connect'")] UnexpectedTypeConnect, #[error("Unexpected message type 'reserve'")] UnexpectedTypeReserve, - #[error("Failed to parse response type field.")] - ParseStatusField, #[error("Unexpected message status '{0:?}'")] UnexpectedStatus(proto::Status), } diff --git a/protocols/relay/src/protocol/outbound_stop.rs b/protocols/relay/src/protocol/outbound_stop.rs index 6f715f14f14..525ebc10821 100644 --- a/protocols/relay/src/protocol/outbound_stop.rs +++ b/protocols/relay/src/protocol/outbound_stop.rs @@ -18,6 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use std::io; use std::time::Duration; use asynchronous_codec::{Framed, FramedParts}; @@ -26,124 +27,111 @@ use futures::prelude::*; use thiserror::Error; use libp2p_identity::PeerId; -use libp2p_swarm::{ConnectionId, Stream, StreamUpgradeError}; +use libp2p_swarm::Stream; -use crate::behaviour::handler::Config; -use crate::protocol::{inbound_hop, MAX_MESSAGE_SIZE}; -use crate::{proto, CircuitId}; +use crate::protocol::MAX_MESSAGE_SIZE; +use crate::{proto, STOP_PROTOCOL_NAME}; #[derive(Debug, Error)] -pub(crate) enum UpgradeError { - #[error("Circuit failed")] - CircuitFailed(#[from] CircuitFailedReason), - #[error("Fatal")] - Fatal(#[from] FatalUpgradeError), -} - -impl From for UpgradeError { - fn from(error: quick_protobuf_codec::Error) -> Self { - Self::Fatal(error.into()) - } -} - -#[derive(Debug, Error)] -pub enum CircuitFailedReason { +pub enum Error { #[error("Remote reported resource limit exceeded.")] ResourceLimitExceeded, #[error("Remote reported permission denied.")] PermissionDenied, + #[error("Remote does not support the `{STOP_PROTOCOL_NAME}` protocol")] + Unsupported, + #[error("IO error")] + Io(#[source] io::Error), + #[error("Protocol error")] + Protocol(#[from] ProtocolViolation), +} + +impl Error { + pub(crate) fn to_status(&self) -> proto::Status { + match self { + Error::ResourceLimitExceeded => proto::Status::RESOURCE_LIMIT_EXCEEDED, + Error::PermissionDenied => proto::Status::PERMISSION_DENIED, + Error::Unsupported => proto::Status::CONNECTION_FAILED, + Error::Io(_) => proto::Status::CONNECTION_FAILED, + Error::Protocol( + ProtocolViolation::UnexpectedStatus(_) | ProtocolViolation::UnexpectedTypeConnect, + ) => proto::Status::UNEXPECTED_MESSAGE, + Error::Protocol(_) => proto::Status::MALFORMED_MESSAGE, + } + } } +/// Depicts all forms of protocol violations. #[derive(Debug, Error)] -pub enum FatalUpgradeError { +pub enum ProtocolViolation { #[error(transparent)] Codec(#[from] quick_protobuf_codec::Error), - #[error("Stream closed")] - StreamClosed, #[error("Expected 'status' field to be set.")] MissingStatusField, - #[error("Failed to parse response type field.")] - ParseTypeField, #[error("Unexpected message type 'connect'")] UnexpectedTypeConnect, - #[error("Failed to parse response type field.")] - ParseStatusField, #[error("Unexpected message status '{0:?}'")] UnexpectedStatus(proto::Status), } +impl From for Error { + fn from(e: quick_protobuf_codec::Error) -> Self { + Error::Protocol(ProtocolViolation::Codec(e)) + } +} + /// Attempts to _connect_ to a peer via the given stream. pub(crate) async fn connect( io: Stream, - stop_command: PendingConnect, -) -> Result, FatalUpgradeError> { + src_peer_id: PeerId, + max_duration: Duration, + max_bytes: u64, +) -> Result { let msg = proto::StopMessage { type_pb: proto::StopMessageType::CONNECT, peer: Some(proto::Peer { - id: stop_command.src_peer_id.to_bytes(), + id: src_peer_id.to_bytes(), addrs: vec![], }), limit: Some(proto::Limit { duration: Some( - stop_command - .max_circuit_duration + max_duration .as_secs() .try_into() .expect("`max_circuit_duration` not to exceed `u32::MAX`."), ), - data: Some(stop_command.max_circuit_bytes), + data: Some(max_bytes), }), status: None, }; let mut substream = Framed::new(io, quick_protobuf_codec::Codec::new(MAX_MESSAGE_SIZE)); - if substream.send(msg).await.is_err() { - return Err(FatalUpgradeError::StreamClosed); - } - - let res = substream.next().await; - - if let None | Some(Err(_)) = res { - return Err(FatalUpgradeError::StreamClosed); - } + substream.send(msg).await?; let proto::StopMessage { type_pb, peer: _, limit: _, status, - } = res.unwrap().expect("should be ok"); + } = substream + .next() + .await + .ok_or(Error::Io(io::ErrorKind::UnexpectedEof.into()))??; match type_pb { - proto::StopMessageType::CONNECT => return Err(FatalUpgradeError::UnexpectedTypeConnect), + proto::StopMessageType::CONNECT => { + return Err(Error::Protocol(ProtocolViolation::UnexpectedTypeConnect)) + } proto::StopMessageType::STATUS => {} } match status { Some(proto::Status::OK) => {} - Some(proto::Status::RESOURCE_LIMIT_EXCEEDED) => { - return Ok(Err(CircuitFailed { - circuit_id: stop_command.circuit_id, - src_peer_id: stop_command.src_peer_id, - src_connection_id: stop_command.src_connection_id, - inbound_circuit_req: stop_command.inbound_circuit_req, - status: proto::Status::RESOURCE_LIMIT_EXCEEDED, - error: StreamUpgradeError::Apply(CircuitFailedReason::ResourceLimitExceeded), - })) - } - Some(proto::Status::PERMISSION_DENIED) => { - return Ok(Err(CircuitFailed { - circuit_id: stop_command.circuit_id, - src_peer_id: stop_command.src_peer_id, - src_connection_id: stop_command.src_connection_id, - inbound_circuit_req: stop_command.inbound_circuit_req, - status: proto::Status::PERMISSION_DENIED, - error: StreamUpgradeError::Apply(CircuitFailedReason::PermissionDenied), - })) - } - Some(s) => return Err(FatalUpgradeError::UnexpectedStatus(s)), - None => return Err(FatalUpgradeError::MissingStatusField), + Some(proto::Status::RESOURCE_LIMIT_EXCEEDED) => return Err(Error::ResourceLimitExceeded), + Some(proto::Status::PERMISSION_DENIED) => return Err(Error::PermissionDenied), + Some(s) => return Err(Error::Protocol(ProtocolViolation::UnexpectedStatus(s))), + None => return Err(Error::Protocol(ProtocolViolation::MissingStatusField)), } let FramedParts { @@ -157,58 +145,13 @@ pub(crate) async fn connect( "Expect a flushed Framed to have an empty write buffer." ); - Ok(Ok(Circuit { - circuit_id: stop_command.circuit_id, - src_peer_id: stop_command.src_peer_id, - src_connection_id: stop_command.src_connection_id, - inbound_circuit_req: stop_command.inbound_circuit_req, + Ok(Circuit { dst_stream: io, dst_pending_data: read_buffer.freeze(), - })) + }) } pub(crate) struct Circuit { - pub(crate) circuit_id: CircuitId, - pub(crate) src_peer_id: PeerId, - pub(crate) src_connection_id: ConnectionId, - pub(crate) inbound_circuit_req: inbound_hop::CircuitReq, pub(crate) dst_stream: Stream, pub(crate) dst_pending_data: Bytes, } - -pub(crate) struct CircuitFailed { - pub(crate) circuit_id: CircuitId, - pub(crate) src_peer_id: PeerId, - pub(crate) src_connection_id: ConnectionId, - pub(crate) inbound_circuit_req: inbound_hop::CircuitReq, - pub(crate) status: proto::Status, - pub(crate) error: StreamUpgradeError, -} - -pub(crate) struct PendingConnect { - pub(crate) circuit_id: CircuitId, - pub(crate) inbound_circuit_req: inbound_hop::CircuitReq, - pub(crate) src_peer_id: PeerId, - pub(crate) src_connection_id: ConnectionId, - max_circuit_duration: Duration, - max_circuit_bytes: u64, -} - -impl PendingConnect { - pub(crate) fn new( - circuit_id: CircuitId, - inbound_circuit_req: inbound_hop::CircuitReq, - src_peer_id: PeerId, - src_connection_id: ConnectionId, - config: &Config, - ) -> Self { - Self { - circuit_id, - inbound_circuit_req, - src_peer_id, - src_connection_id, - max_circuit_duration: config.max_circuit_duration, - max_circuit_bytes: config.max_circuit_bytes, - } - } -}