Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(identify): use ReadyUpgrade for {In,Out}boundUpgrade #4563

Merged
merged 13 commits into from
Sep 28, 2023
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions protocols/identify/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ categories = ["network-programming", "asynchronous"]
asynchronous-codec = "0.6"
futures = "0.3.28"
futures-timer = "3.0.2"
futures-bounded = { workspace = true }
libp2p-core = { workspace = true }
libp2p-swarm = { workspace = true }
libp2p-identity = { workspace = true }
Expand Down
209 changes: 154 additions & 55 deletions protocols/identify/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::protocol::{Identify, InboundPush, OutboundPush, Push, UpgradeError};
use crate::protocol::{recv_identify, recv_push, send, UpgradeError};
use crate::protocol::{Info, PushInfo};
use crate::{PROTOCOL_NAME, PUSH_PROTOCOL_NAME};
use either::Either;
use futures::future::BoxFuture;
use futures::prelude::*;
use futures::stream::FuturesUnordered;
use futures_bounded::Timeout;
use futures_timer::Delay;
use libp2p_core::upgrade::SelectUpgrade;
use libp2p_core::upgrade::{ReadyUpgrade, SelectUpgrade};
use libp2p_core::Multiaddr;
use libp2p_identity::PeerId;
use libp2p_identity::PublicKey;
use libp2p_swarm::handler::{
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
ProtocolSupport,
ConnectionEvent, FullyNegotiatedInbound, FullyNegotiatedOutbound, ProtocolSupport,
};
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamProtocol, StreamUpgradeError,
Expand All @@ -42,6 +42,9 @@ use smallvec::SmallVec;
use std::collections::HashSet;
use std::{io, task::Context, task::Poll, time::Duration};

const STREAM_TIMEOUT: Duration = Duration::from_secs(60);
const MAX_CONCURRENT_STREAMS_PER_CONNECTION: usize = 10;

/// Protocol handler for sending and receiving identification requests.
///
/// Outbound requests are sent periodically. The handler performs expects
Expand All @@ -52,11 +55,22 @@ pub struct Handler {
inbound_identify_push: Option<BoxFuture<'static, Result<PushInfo, UpgradeError>>>,
/// Pending events to yield.
events: SmallVec<
[ConnectionHandlerEvent<Either<Identify, Push<OutboundPush>>, (), Event, io::Error>; 4],
[ConnectionHandlerEvent<
Either<ReadyUpgrade<StreamProtocol>, ReadyUpgrade<StreamProtocol>>,
(),
Event,
io::Error,
>; 4],
>,

/// Pending identification replies, awaiting being sent.
pending_replies: FuturesUnordered<BoxFuture<'static, Result<(), UpgradeError>>>,
pending_replies: futures_bounded::FuturesSet<Result<(), UpgradeError>>,

/// Pending identify requests.
outbound_identify_futs: futures_bounded::FuturesSet<Result<Info, UpgradeError>>,

/// Pending identify/push requests.
outbound_identify_push_futs: futures_bounded::FuturesSet<Result<(), UpgradeError>>,
dgarus marked this conversation as resolved.
Show resolved Hide resolved

/// Future that fires when we need to identify the node again.
trigger_next_identify: Delay,
Expand Down Expand Up @@ -127,7 +141,18 @@ impl Handler {
remote_peer_id,
inbound_identify_push: Default::default(),
events: SmallVec::new(),
pending_replies: FuturesUnordered::new(),
pending_replies: futures_bounded::FuturesSet::new(
STREAM_TIMEOUT,
MAX_CONCURRENT_STREAMS_PER_CONNECTION,
),
outbound_identify_futs: futures_bounded::FuturesSet::new(
STREAM_TIMEOUT,
MAX_CONCURRENT_STREAMS_PER_CONNECTION,
),
outbound_identify_push_futs: futures_bounded::FuturesSet::new(
STREAM_TIMEOUT,
MAX_CONCURRENT_STREAMS_PER_CONNECTION,
),
trigger_next_identify: Delay::new(initial_delay),
exchanged_one_periodic_identify: false,
interval,
Expand All @@ -152,14 +177,23 @@ impl Handler {
>,
) {
match output {
future::Either::Left(substream) => {
future::Either::Left(stream) => {
let info = self.build_info();

self.pending_replies
.push(crate::protocol::send(substream, info).boxed());
if self
.pending_replies
.try_push(crate::protocol::send(stream, info))
.is_err()
{
warn!("Dropping inbound stream because we are at capacity");
}
}
future::Either::Right(fut) => {
if self.inbound_identify_push.replace(fut).is_some() {
future::Either::Right(stream) => {
if self
.inbound_identify_push
.replace(recv_push(stream).boxed())
.is_some()
dgarus marked this conversation as resolved.
Show resolved Hide resolved
{
warn!(
"New inbound identify push stream from {} while still \
upgrading previous one. Replacing previous with new.",
Expand All @@ -180,34 +214,29 @@ impl Handler {
>,
) {
match output {
future::Either::Left(remote_info) => {
self.handle_incoming_info(&remote_info);
future::Either::Left(stream) => {
if self
.outbound_identify_futs
.try_push(recv_identify(stream))
.is_err()
{
warn!("Dropping outbound identify stream because we are at capacity");
}
}
future::Either::Right(stream) => {
let info = self.build_info();

self.events
.push(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified(
remote_info,
)));
if self
.outbound_identify_push_futs
.try_push(send(stream, info))
.is_err()
{
warn!("Dropping outbound identify push stream because we are at capacity");
}
}
future::Either::Right(()) => self.events.push(ConnectionHandlerEvent::NotifyBehaviour(
Event::IdentificationPushed,
)),
}
}

fn on_dial_upgrade_error(
&mut self,
DialUpgradeError { error: err, .. }: DialUpgradeError<
<Self as ConnectionHandler>::OutboundOpenInfo,
<Self as ConnectionHandler>::OutboundProtocol,
>,
) {
let err = err.map_upgrade_err(|e| e.into_inner());
self.events.push(ConnectionHandlerEvent::NotifyBehaviour(
Event::IdentificationError(err),
));
self.trigger_next_identify.reset(self.interval);
}

fn build_info(&mut self) -> Info {
Info {
public_key: self.public_key.clone(),
Expand Down Expand Up @@ -268,13 +297,20 @@ impl ConnectionHandler for Handler {
type FromBehaviour = InEvent;
type ToBehaviour = Event;
type Error = io::Error;
type InboundProtocol = SelectUpgrade<Identify, Push<InboundPush>>;
type OutboundProtocol = Either<Identify, Push<OutboundPush>>;
type InboundProtocol =
SelectUpgrade<ReadyUpgrade<StreamProtocol>, ReadyUpgrade<StreamProtocol>>;
type OutboundProtocol = Either<ReadyUpgrade<StreamProtocol>, ReadyUpgrade<StreamProtocol>>;
type OutboundOpenInfo = ();
type InboundOpenInfo = ();

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
SubstreamProtocol::new(SelectUpgrade::new(Identify, Push::inbound()), ())
SubstreamProtocol::new(
SelectUpgrade::new(
ReadyUpgrade::new(PROTOCOL_NAME),
ReadyUpgrade::new(PUSH_PROTOCOL_NAME),
),
(),
)
}

fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
Expand All @@ -283,10 +319,12 @@ impl ConnectionHandler for Handler {
self.external_addresses = addresses;
}
InEvent::Push => {
let info = self.build_info();
self.events
.push(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(Either::Right(Push::outbound(info)), ()),
protocol: SubstreamProtocol::new(
Either::Right(ReadyUpgrade::new(PUSH_PROTOCOL_NAME)),
(),
),
});
}
}
Expand Down Expand Up @@ -317,10 +355,13 @@ impl ConnectionHandler for Handler {
// Poll the future that fires when we need to identify the node again.
if let Poll::Ready(()) = self.trigger_next_identify.poll_unpin(cx) {
self.trigger_next_identify.reset(self.interval);
let ev = ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(Either::Left(Identify), ()),
let event = ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(
Either::Left(ReadyUpgrade::new(PROTOCOL_NAME)),
(),
),
};
return Poll::Ready(ev);
return Poll::Ready(event);
}

if let Some(Poll::Ready(res)) = self
Expand All @@ -342,14 +383,70 @@ impl ConnectionHandler for Handler {
}
}

// Check for pending replies to send.
if let Poll::Ready(Some(result)) = self.pending_replies.poll_next_unpin(cx) {
let event = result
.map(|()| Event::Identification)
.unwrap_or_else(|err| Event::IdentificationError(StreamUpgradeError::Apply(err)));
self.exchanged_one_periodic_identify = true;
// Check for pending replies.
match self.pending_replies.poll_unpin(cx) {
Poll::Ready(Ok(Ok(()))) => {
self.exchanged_one_periodic_identify = true;

return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::Identification,
));
}
Poll::Ready(Ok(Err(e))) => {
self.exchanged_one_periodic_identify = true;

return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::IdentificationError(StreamUpgradeError::Apply(e)),
));
}
Poll::Ready(Err(Timeout { .. })) => {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::IdentificationError(StreamUpgradeError::Timeout),
));
}
Poll::Pending => {}
}

match self.outbound_identify_futs.poll_unpin(cx) {
Poll::Ready(Ok(Ok(remote_info))) => {
self.handle_incoming_info(&remote_info);

return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified(
remote_info,
)));
}
Poll::Ready(Ok(Err(e))) => {
self.trigger_next_identify.reset(self.interval);

return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::IdentificationError(StreamUpgradeError::Apply(e)),
));
}
Poll::Ready(Err(Timeout { .. })) => {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::IdentificationError(StreamUpgradeError::Timeout),
));
}
Poll::Pending => {}
}

match self.outbound_identify_push_futs.poll_unpin(cx) {
Poll::Ready(Ok(Ok(()))) => {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::IdentificationPushed,
));
}
Poll::Ready(Ok(Err(e))) => {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::IdentificationError(StreamUpgradeError::Apply(e)),
));
}
Poll::Ready(Err(Timeout { .. })) => {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::IdentificationError(StreamUpgradeError::Timeout),
));
}
Poll::Pending => {}
}

Poll::Pending
Expand All @@ -371,8 +468,11 @@ impl ConnectionHandler for Handler {
ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => {
self.on_fully_negotiated_outbound(fully_negotiated_outbound)
}
ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
self.on_dial_upgrade_error(dial_upgrade_error)
ConnectionEvent::DialUpgradeError(_dial_upgrade_error) => {
self.events.push(ConnectionHandlerEvent::NotifyBehaviour(
Event::IdentificationError(StreamUpgradeError::NegotiationFailed),
dgarus marked this conversation as resolved.
Show resolved Hide resolved
));
self.trigger_next_identify.reset(self.interval);
}
ConnectionEvent::AddressChange(_)
| ConnectionEvent::ListenUpgradeError(_)
Expand All @@ -392,11 +492,10 @@ impl ConnectionHandler for Handler {
self.remote_peer_id
);

let info = self.build_info();
self.events
.push(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(
Either::Right(Push::outbound(info)),
Either::Right(ReadyUpgrade::new(PUSH_PROTOCOL_NAME)),
(),
),
});
Expand Down
Loading