diff --git a/src/protocol/transport_service.rs b/src/protocol/transport_service.rs index 68c3bce3..7fb6ac37 100644 --- a/src/protocol/transport_service.rs +++ b/src/protocol/transport_service.rs @@ -389,7 +389,10 @@ impl Stream for TransportService { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { while let Poll::Ready(event) = self.rx.poll_recv(cx) { match event { - None => return Poll::Ready(None), + None => { + tracing::warn!(target: LOG_TARGET, "transport service closed"); + return Poll::Ready(None); + } Some(InnerTransportEvent::ConnectionEstablished { peer, endpoint, diff --git a/src/transport/tcp/connection.rs b/src/transport/tcp/connection.rs index cb752244..b13420f7 100644 --- a/src/transport/tcp/connection.rs +++ b/src/transport/tcp/connection.rs @@ -493,6 +493,231 @@ impl TcpConnection { }) } + /// Handles the yamux substream. + /// + /// Returns `true` if the connection handler should exit. + async fn handle_yamux_substream( + &mut self, + substream: Option>, + ) -> crate::Result { + match substream { + Some(Ok(stream)) => { + let substream_id = { + let substream_id = self.next_substream_id.fetch_add(1usize, Ordering::Relaxed); + SubstreamId::from(substream_id) + }; + let protocols = self.protocol_set.protocols(); + let permit = self.protocol_set.try_get_permit().ok_or(Error::ConnectionClosed)?; + let open_timeout = self.substream_open_timeout; + + self.pending_substreams.push(Box::pin(async move { + match tokio::time::timeout( + open_timeout, + Self::accept_substream( + stream, + permit, + substream_id, + protocols, + open_timeout, + ), + ) + .await + { + Ok(Ok(substream)) => Ok(substream), + Ok(Err(error)) => Err(ConnectionError::FailedToNegotiate { + protocol: None, + substream_id: None, + error: SubstreamError::NegotiationError(error), + }), + Err(_) => Err(ConnectionError::Timeout { + protocol: None, + substream_id: None, + }), + } + })); + + Ok(false) + } + Some(Err(error)) => { + tracing::debug!( + target: LOG_TARGET, + peer = ?self.peer, + ?error, + "connection closed with error", + ); + + self.protocol_set + .report_connection_closed(self.peer, self.endpoint.connection_id()) + .await?; + Ok(true) + } + None => { + tracing::debug!(target: LOG_TARGET, peer = ?self.peer, "connection closed"); + self.protocol_set + .report_connection_closed(self.peer, self.endpoint.connection_id()) + .await?; + Ok(true) + } + } + } + + /// Handles negotiated substream results. + async fn handle_negotiated_substream( + &mut self, + result: Result, + ) { + match result { + // TODO: return error to protocol + Err(error) => { + tracing::debug!( + target: LOG_TARGET, + ?error, + "failed to accept/open substream", + ); + + let (protocol, substream_id, error) = match error { + ConnectionError::Timeout { + protocol, + substream_id, + } => ( + protocol, + substream_id, + SubstreamError::NegotiationError(NegotiationError::Timeout), + ), + ConnectionError::FailedToNegotiate { + protocol, + substream_id, + error, + } => (protocol, substream_id, error), + }; + + match (protocol, substream_id) { + (Some(protocol), Some(substream_id)) => { + if let Err(error) = self + .protocol_set + .report_substream_open_failure(protocol.clone(), substream_id, error) + .await + { + tracing::error!( + target: LOG_TARGET, + ?protocol, + endpoint = ?self.endpoint, + ?error, + "failed to register substream open failure to protocol" + ); + } + } + _ => {} + } + } + Ok(substream) => { + let protocol = substream.protocol.clone(); + let direction = substream.direction; + let substream_id = substream.substream_id; + let socket = FuturesAsyncReadCompatExt::compat(substream.io); + let bandwidth_sink = self.bandwidth_sink.clone(); + + let substream = substream::Substream::new_tcp( + self.peer, + substream_id, + Substream::new(socket, bandwidth_sink, substream.permit), + self.protocol_set.protocol_codec(&protocol), + ); + + if let Err(error) = self + .protocol_set + .report_substream_open(self.peer, protocol.clone(), direction, substream) + .await + { + tracing::error!( + target: LOG_TARGET, + ?protocol, + peer = ?self.peer, + endpoint = ?self.endpoint, + ?error, + "failed to register opened substream to protocol", + ); + } + } + } + } + + /// Handles protocol command. + /// + /// Returns `true` if the connection handler should exit. + async fn handle_protocol_command( + &mut self, + command: Option, + ) -> crate::Result { + match command { + Some(ProtocolCommand::OpenSubstream { + protocol, + fallback_names, + substream_id, + permit, + }) => { + let control = self.control.clone(); + let open_timeout = self.substream_open_timeout; + + tracing::trace!( + target: LOG_TARGET, + ?protocol, + ?substream_id, + "open substream", + ); + + self.pending_substreams.push(Box::pin(async move { + match tokio::time::timeout( + open_timeout, + Self::open_substream( + control, + substream_id, + permit, + protocol.clone(), + fallback_names, + open_timeout, + ), + ) + .await + { + Ok(Ok(substream)) => Ok(substream), + Ok(Err(error)) => Err(ConnectionError::FailedToNegotiate { + protocol: Some(protocol), + substream_id: Some(substream_id), + error, + }), + Err(_) => Err(ConnectionError::Timeout { + protocol: Some(protocol), + substream_id: Some(substream_id), + }), + } + })); + + Ok(false) + } + Some(ProtocolCommand::ForceClose) => { + tracing::debug!( + target: LOG_TARGET, + peer = ?self.peer, + connection_id = ?self.endpoint.connection_id(), + "force closing connection", + ); + + self.protocol_set + .report_connection_closed(self.peer, self.endpoint.connection_id()) + .await?; + Ok(true) + } + None => { + tracing::debug!(target: LOG_TARGET, "protocols have disconnected, closing connection"); + self.protocol_set + .report_connection_closed(self.peer, self.endpoint.connection_id()) + .await?; + Ok(true) + } + } + } + /// Start connection event loop. pub(crate) async fn start(mut self) -> crate::Result<()> { self.protocol_set @@ -501,169 +726,17 @@ impl TcpConnection { loop { tokio::select! { - substream = self.connection.next() => match substream { - Some(Ok(stream)) => { - let substream_id = { - let substream_id = self.next_substream_id.fetch_add(1usize, Ordering::Relaxed); - SubstreamId::from(substream_id) - }; - let protocols = self.protocol_set.protocols(); - let permit = self.protocol_set.try_get_permit().ok_or(Error::ConnectionClosed)?; - let open_timeout = self.substream_open_timeout; - - self.pending_substreams.push(Box::pin(async move { - match tokio::time::timeout( - open_timeout, - Self::accept_substream(stream, permit, substream_id, protocols, open_timeout), - ) - .await - { - Ok(Ok(substream)) => Ok(substream), - Ok(Err(error)) => Err(ConnectionError::FailedToNegotiate { - protocol: None, - substream_id: None, - error: SubstreamError::NegotiationError(error), - }), - Err(_) => Err(ConnectionError::Timeout { - protocol: None, - substream_id: None - }), - } - })); - }, - Some(Err(error)) => { - tracing::debug!( - target: LOG_TARGET, - peer = ?self.peer, - ?error, - "connection closed with error", - ); - self.protocol_set.report_connection_closed(self.peer, self.endpoint.connection_id()).await?; - - return Ok(()) - } - None => { - tracing::debug!(target: LOG_TARGET, peer = ?self.peer, "connection closed"); - self.protocol_set.report_connection_closed(self.peer, self.endpoint.connection_id()).await?; - - return Ok(()) + substream = self.connection.next() => { + if self.handle_yamux_substream(substream).await? { + return Ok(()); } }, - // TODO: move this to a function substream = self.pending_substreams.select_next_some(), if !self.pending_substreams.is_empty() => { - match substream { - // TODO: return error to protocol - Err(error) => { - tracing::debug!( - target: LOG_TARGET, - ?error, - "failed to accept/open substream", - ); - - let (protocol, substream_id, error) = match error { - ConnectionError::Timeout { protocol, substream_id } => { - (protocol, substream_id, SubstreamError::NegotiationError(NegotiationError::Timeout)) - } - ConnectionError::FailedToNegotiate { protocol, substream_id, error } => { - (protocol, substream_id, error) - } - }; - - match (protocol, substream_id) { - (Some(protocol), Some(substream_id)) => { - if let Err(error) = self.protocol_set - .report_substream_open_failure(protocol, substream_id, error) - .await - { - tracing::error!( - target: LOG_TARGET, - ?error, - "failed to register opened substream to protocol" - ); - } - } - _ => {} - } - } - Ok(substream) => { - let protocol = substream.protocol.clone(); - let direction = substream.direction; - let substream_id = substream.substream_id; - let socket = FuturesAsyncReadCompatExt::compat(substream.io); - let bandwidth_sink = self.bandwidth_sink.clone(); - - let substream = substream::Substream::new_tcp( - self.peer, - substream_id, - Substream::new(socket, bandwidth_sink, substream.permit), - self.protocol_set.protocol_codec(&protocol) - ); - - if let Err(error) = self.protocol_set - .report_substream_open(self.peer, protocol, direction, substream) - .await - { - tracing::error!( - target: LOG_TARGET, - ?error, - "failed to register opened substream to protocol", - ); - } - } - } + self.handle_negotiated_substream(substream).await; } - protocol = self.protocol_set.next() => match protocol { - Some(ProtocolCommand::OpenSubstream { protocol, fallback_names, substream_id, permit }) => { - let control = self.control.clone(); - let open_timeout = self.substream_open_timeout; - - tracing::trace!( - target: LOG_TARGET, - ?protocol, - ?substream_id, - "open substream", - ); - - self.pending_substreams.push(Box::pin(async move { - match tokio::time::timeout( - open_timeout, - Self::open_substream( - control, - substream_id, - permit, - protocol.clone(), - fallback_names, - open_timeout, - ), - ) - .await - { - Ok(Ok(substream)) => Ok(substream), - Ok(Err(error)) => Err(ConnectionError::FailedToNegotiate { - protocol: Some(protocol), - substream_id: Some(substream_id), - error, - }), - Err(_) => Err(ConnectionError::Timeout { - protocol: Some(protocol), - substream_id: Some(substream_id) - }), - } - })); - } - Some(ProtocolCommand::ForceClose) => { - tracing::debug!( - target: LOG_TARGET, - peer = ?self.peer, - connection_id = ?self.endpoint.connection_id(), - "force closing connection", - ); - - return self.protocol_set.report_connection_closed(self.peer, self.endpoint.connection_id()).await - } - None => { - tracing::debug!(target: LOG_TARGET, "protocols have disconnected, closing connection"); - return self.protocol_set.report_connection_closed(self.peer, self.endpoint.connection_id()).await + protocol = self.protocol_set.next() => { + if self.handle_protocol_command(protocol).await? { + return Ok(()) } } }