Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(identify): use ReadyUpgrade for {In,Out}boundUpgrade #4563

Merged
merged 13 commits into from
Sep 28, 2023
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions protocols/identify/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ categories = ["network-programming", "asynchronous"]
asynchronous-codec = "0.6"
futures = "0.3.28"
futures-timer = "3.0.2"
futures-bounded = { workspace = true }
libp2p-core = { workspace = true }
libp2p-swarm = { workspace = true }
libp2p-identity = { workspace = true }
Expand Down
200 changes: 122 additions & 78 deletions protocols/identify/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::protocol::{Identify, InboundPush, OutboundPush, Push, UpgradeError};
use crate::protocol::{Info, PushInfo};
use crate::protocol::{Info, PushInfo, UpgradeError};
use crate::{protocol, PROTOCOL_NAME, PUSH_PROTOCOL_NAME};
use either::Either;
use futures::future::BoxFuture;
use futures::prelude::*;
use futures::stream::FuturesUnordered;
use futures_bounded::Timeout;
use futures_timer::Delay;
use libp2p_core::upgrade::SelectUpgrade;
use libp2p_core::upgrade::{ReadyUpgrade, SelectUpgrade};
use libp2p_core::Multiaddr;
use libp2p_identity::PeerId;
use libp2p_identity::PublicKey;
Expand All @@ -42,21 +41,27 @@ use smallvec::SmallVec;
use std::collections::HashSet;
use std::{io, task::Context, task::Poll, time::Duration};

const STREAM_TIMEOUT: Duration = Duration::from_secs(60);
const MAX_CONCURRENT_STREAMS_PER_CONNECTION: usize = 10;

/// Protocol handler for sending and receiving identification requests.
///
/// Outbound requests are sent periodically. The handler performs expects
/// at least one identification request to be answered by the remote before
/// permitting the underlying connection to be closed.
pub struct Handler {
remote_peer_id: PeerId,
inbound_identify_push: Option<BoxFuture<'static, Result<PushInfo, UpgradeError>>>,
/// Pending events to yield.
events: SmallVec<
[ConnectionHandlerEvent<Either<Identify, Push<OutboundPush>>, (), Event, io::Error>; 4],
[ConnectionHandlerEvent<
Either<ReadyUpgrade<StreamProtocol>, ReadyUpgrade<StreamProtocol>>,
(),
Event,
io::Error,
>; 4],
>,

/// Pending identification replies, awaiting being sent.
pending_replies: FuturesUnordered<BoxFuture<'static, Result<(), UpgradeError>>>,
active_streams: futures_bounded::FuturesSet<Result<Success, UpgradeError>>,

/// Future that fires when we need to identify the node again.
trigger_next_identify: Delay,
Expand Down Expand Up @@ -125,9 +130,11 @@ impl Handler {
) -> Self {
Self {
remote_peer_id,
inbound_identify_push: Default::default(),
events: SmallVec::new(),
pending_replies: FuturesUnordered::new(),
active_streams: futures_bounded::FuturesSet::new(
STREAM_TIMEOUT,
MAX_CONCURRENT_STREAMS_PER_CONNECTION,
),
trigger_next_identify: Delay::new(initial_delay),
exchanged_one_periodic_identify: false,
interval,
Expand All @@ -152,19 +159,28 @@ impl Handler {
>,
) {
match output {
future::Either::Left(substream) => {
future::Either::Left(stream) => {
let info = self.build_info();

self.pending_replies
.push(crate::protocol::send(substream, info).boxed());
if self
.active_streams
.try_push(
protocol::send_identify(stream, info).map_ok(|_| Success::SentIdentify),
)
.is_err()
{
warn!("Dropping inbound stream because we are at capacity");
} else {
self.exchanged_one_periodic_identify = true;
}
}
future::Either::Right(fut) => {
if self.inbound_identify_push.replace(fut).is_some() {
warn!(
"New inbound identify push stream from {} while still \
upgrading previous one. Replacing previous with new.",
self.remote_peer_id,
);
future::Either::Right(stream) => {
if self
.active_streams
.try_push(protocol::recv_push(stream).map_ok(Success::ReceivedIdentifyPush))
.is_err()
{
warn!("Dropping inbound identify push stream because we are at capacity");
}
}
}
Expand All @@ -180,34 +196,31 @@ impl Handler {
>,
) {
match output {
future::Either::Left(remote_info) => {
self.handle_incoming_info(&remote_info);
future::Either::Left(stream) => {
if self
.active_streams
.try_push(protocol::recv_identify(stream).map_ok(Success::ReceivedIdentify))
.is_err()
{
warn!("Dropping outbound identify stream because we are at capacity");
}
}
future::Either::Right(stream) => {
let info = self.build_info();

self.events
.push(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified(
remote_info,
)));
if self
.active_streams
.try_push(
protocol::send_identify(stream, info).map_ok(|_| Success::SentIdentifyPush),
)
.is_err()
{
warn!("Dropping outbound identify push stream because we are at capacity");
}
}
future::Either::Right(()) => self.events.push(ConnectionHandlerEvent::NotifyBehaviour(
Event::IdentificationPushed,
)),
}
}

fn on_dial_upgrade_error(
&mut self,
DialUpgradeError { error: err, .. }: DialUpgradeError<
<Self as ConnectionHandler>::OutboundOpenInfo,
<Self as ConnectionHandler>::OutboundProtocol,
>,
) {
let err = err.map_upgrade_err(|e| e.into_inner());
self.events.push(ConnectionHandlerEvent::NotifyBehaviour(
Event::IdentificationError(err),
));
self.trigger_next_identify.reset(self.interval);
}

fn build_info(&mut self) -> Info {
Info {
public_key: self.public_key.clone(),
Expand Down Expand Up @@ -268,13 +281,20 @@ impl ConnectionHandler for Handler {
type FromBehaviour = InEvent;
type ToBehaviour = Event;
type Error = io::Error;
type InboundProtocol = SelectUpgrade<Identify, Push<InboundPush>>;
type OutboundProtocol = Either<Identify, Push<OutboundPush>>;
type InboundProtocol =
SelectUpgrade<ReadyUpgrade<StreamProtocol>, ReadyUpgrade<StreamProtocol>>;
type OutboundProtocol = Either<ReadyUpgrade<StreamProtocol>, ReadyUpgrade<StreamProtocol>>;
type OutboundOpenInfo = ();
type InboundOpenInfo = ();

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
SubstreamProtocol::new(SelectUpgrade::new(Identify, Push::inbound()), ())
SubstreamProtocol::new(
SelectUpgrade::new(
ReadyUpgrade::new(PROTOCOL_NAME),
ReadyUpgrade::new(PUSH_PROTOCOL_NAME),
),
(),
)
}

fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
Expand All @@ -283,21 +303,19 @@ impl ConnectionHandler for Handler {
self.external_addresses = addresses;
}
InEvent::Push => {
let info = self.build_info();
self.events
.push(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(Either::Right(Push::outbound(info)), ()),
protocol: SubstreamProtocol::new(
Either::Right(ReadyUpgrade::new(PUSH_PROTOCOL_NAME)),
(),
),
});
}
}
}

fn connection_keep_alive(&self) -> KeepAlive {
if self.inbound_identify_push.is_some() {
return KeepAlive::Yes;
}

if !self.pending_replies.is_empty() {
if !self.active_streams.is_empty() {
return KeepAlive::Yes;
}

Expand All @@ -317,20 +335,34 @@ impl ConnectionHandler for Handler {
// Poll the future that fires when we need to identify the node again.
if let Poll::Ready(()) = self.trigger_next_identify.poll_unpin(cx) {
self.trigger_next_identify.reset(self.interval);
let ev = ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(Either::Left(Identify), ()),
let event = ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(
Either::Left(ReadyUpgrade::new(PROTOCOL_NAME)),
(),
),
};
return Poll::Ready(ev);
return Poll::Ready(event);
}

if let Some(Poll::Ready(res)) = self
.inbound_identify_push
.as_mut()
.map(|f| f.poll_unpin(cx))
{
self.inbound_identify_push.take();
match self.active_streams.poll_unpin(cx) {
Poll::Ready(Ok(Ok(Success::ReceivedIdentify(remote_info)))) => {
self.handle_incoming_info(&remote_info);

if let Ok(remote_push_info) = res {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified(
remote_info,
)));
}
Poll::Ready(Ok(Ok(Success::SentIdentifyPush))) => {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::IdentificationPushed,
));
}
Poll::Ready(Ok(Ok(Success::SentIdentify))) => {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::Identification,
));
}
Poll::Ready(Ok(Ok(Success::ReceivedIdentifyPush(remote_push_info)))) => {
if let Some(mut info) = self.remote_info.clone() {
info.merge(remote_push_info);
self.handle_incoming_info(&info);
Expand All @@ -340,16 +372,17 @@ impl ConnectionHandler for Handler {
));
};
}
}

// Check for pending replies to send.
if let Poll::Ready(Some(result)) = self.pending_replies.poll_next_unpin(cx) {
let event = result
.map(|()| Event::Identification)
.unwrap_or_else(|err| Event::IdentificationError(StreamUpgradeError::Apply(err)));
self.exchanged_one_periodic_identify = true;

return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));
Poll::Ready(Ok(Err(e))) => {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::IdentificationError(StreamUpgradeError::Apply(e)),
));
}
Poll::Ready(Err(Timeout { .. })) => {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::IdentificationError(StreamUpgradeError::Timeout),
));
}
Poll::Pending => {}
}

Poll::Pending
Expand All @@ -371,8 +404,13 @@ impl ConnectionHandler for Handler {
ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => {
self.on_fully_negotiated_outbound(fully_negotiated_outbound)
}
ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
self.on_dial_upgrade_error(dial_upgrade_error)
ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => {
self.events.push(ConnectionHandlerEvent::NotifyBehaviour(
Event::IdentificationError(
error.map_upgrade_err(|e| void::unreachable(e.into_inner())),
),
));
self.trigger_next_identify.reset(self.interval);
}
ConnectionEvent::AddressChange(_)
| ConnectionEvent::ListenUpgradeError(_)
Expand All @@ -392,11 +430,10 @@ impl ConnectionHandler for Handler {
self.remote_peer_id
);

let info = self.build_info();
self.events
.push(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(
Either::Right(Push::outbound(info)),
Either::Right(ReadyUpgrade::new(PUSH_PROTOCOL_NAME)),
(),
),
});
Expand All @@ -405,3 +442,10 @@ impl ConnectionHandler for Handler {
}
}
}

enum Success {
SentIdentify,
ReceivedIdentify(Info),
SentIdentifyPush,
ReceivedIdentifyPush(PushInfo),
}
Loading