From 22f1e23610d4c28653c34cdb49f8a1a98e6bbfe5 Mon Sep 17 00:00:00 2001 From: Ademola <83315082+0xcrust@users.noreply.github.com> Date: Tue, 14 Nov 2023 03:47:01 +0100 Subject: [PATCH] feat(gossipsub): process send-queue before inbound stream Process outbound stream before inbound stream in `gossipsub::EnabledHandler::poll(..)`. Pull-Request: #4778. --- protocols/gossipsub/CHANGELOG.md | 6 +- protocols/gossipsub/src/handler.rs | 128 ++++++++++++++--------------- 2 files changed, 67 insertions(+), 67 deletions(-) diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index dd5fcf5febd..68041f2dcd3 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -10,9 +10,9 @@ - Remove deprecated `gossipsub::Config::idle_timeout` in favor of `SwarmBuilder::idle_connection_timeout`. See [PR 4642](https://github.com/libp2p/rust-libp2p/pull/4642). - Return typed error from config builder. - See [PR 4445]. - -[PR 4445]: https://github.com/libp2p/rust-libp2p/pull/4445 + See [PR 4445](https://github.com/libp2p/rust-libp2p/pull/4445). +- Process outbound stream before inbound stream in `EnabledHandler::poll(..)`. + See [PR 4778](https://github.com/libp2p/rust-libp2p/pull/4778). ## 0.45.2 diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 63ef96781d9..e2ec681321c 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -241,70 +241,6 @@ impl EnabledHandler { }); } - loop { - match std::mem::replace( - &mut self.inbound_substream, - Some(InboundSubstreamState::Poisoned), - ) { - // inbound idle state - Some(InboundSubstreamState::WaitingInput(mut substream)) => { - match substream.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(message))) => { - self.last_io_activity = Instant::now(); - self.inbound_substream = - Some(InboundSubstreamState::WaitingInput(substream)); - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(message)); - } - Poll::Ready(Some(Err(error))) => { - tracing::debug!("Failed to read from inbound stream: {error}"); - // Close this side of the stream. If the - // peer is still around, they will re-establish their - // outbound stream i.e. our inbound stream. - self.inbound_substream = - Some(InboundSubstreamState::Closing(substream)); - } - // peer closed the stream - Poll::Ready(None) => { - tracing::debug!("Inbound stream closed by remote"); - self.inbound_substream = - Some(InboundSubstreamState::Closing(substream)); - } - Poll::Pending => { - self.inbound_substream = - Some(InboundSubstreamState::WaitingInput(substream)); - break; - } - } - } - Some(InboundSubstreamState::Closing(mut substream)) => { - match Sink::poll_close(Pin::new(&mut substream), cx) { - Poll::Ready(res) => { - if let Err(e) = res { - // Don't close the connection but just drop the inbound substream. - // In case the remote has more to send, they will open up a new - // substream. - tracing::debug!("Inbound substream error while closing: {e}"); - } - self.inbound_substream = None; - break; - } - Poll::Pending => { - self.inbound_substream = - Some(InboundSubstreamState::Closing(substream)); - break; - } - } - } - None => { - self.inbound_substream = None; - break; - } - Some(InboundSubstreamState::Poisoned) => { - unreachable!("Error occurred during inbound stream processing") - } - } - } - // process outbound stream loop { match std::mem::replace( @@ -382,6 +318,70 @@ impl EnabledHandler { } } + loop { + match std::mem::replace( + &mut self.inbound_substream, + Some(InboundSubstreamState::Poisoned), + ) { + // inbound idle state + Some(InboundSubstreamState::WaitingInput(mut substream)) => { + match substream.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(message))) => { + self.last_io_activity = Instant::now(); + self.inbound_substream = + Some(InboundSubstreamState::WaitingInput(substream)); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(message)); + } + Poll::Ready(Some(Err(error))) => { + tracing::debug!("Failed to read from inbound stream: {error}"); + // Close this side of the stream. If the + // peer is still around, they will re-establish their + // outbound stream i.e. our inbound stream. + self.inbound_substream = + Some(InboundSubstreamState::Closing(substream)); + } + // peer closed the stream + Poll::Ready(None) => { + tracing::debug!("Inbound stream closed by remote"); + self.inbound_substream = + Some(InboundSubstreamState::Closing(substream)); + } + Poll::Pending => { + self.inbound_substream = + Some(InboundSubstreamState::WaitingInput(substream)); + break; + } + } + } + Some(InboundSubstreamState::Closing(mut substream)) => { + match Sink::poll_close(Pin::new(&mut substream), cx) { + Poll::Ready(res) => { + if let Err(e) = res { + // Don't close the connection but just drop the inbound substream. + // In case the remote has more to send, they will open up a new + // substream. + tracing::debug!("Inbound substream error while closing: {e}"); + } + self.inbound_substream = None; + break; + } + Poll::Pending => { + self.inbound_substream = + Some(InboundSubstreamState::Closing(substream)); + break; + } + } + } + None => { + self.inbound_substream = None; + break; + } + Some(InboundSubstreamState::Poisoned) => { + unreachable!("Error occurred during inbound stream processing") + } + } + } + Poll::Pending } }