From cff7c4a681a366dadb9c2e7a4c8ddd10881f0d91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Wed, 7 Dec 2022 10:27:14 +0000 Subject: [PATCH 01/14] Identify: rename pending IdentifyHandler references on doc. --- protocols/identify/src/behaviour.rs | 2 +- protocols/identify/src/handler.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/protocols/identify/src/behaviour.rs b/protocols/identify/src/behaviour.rs index 10bfab3ed7d..0c31398104f 100644 --- a/protocols/identify/src/behaviour.rs +++ b/protocols/identify/src/behaviour.rs @@ -618,7 +618,7 @@ mod tests { // nb. Either swarm may receive the `Identified` event first, upon which // it will permit the connection to be closed, as defined by - // `IdentifyHandler::connection_keep_alive`. Hence the test succeeds if + // `Handler::connection_keep_alive`. Hence the test succeeds if // either `Identified` event arrives correctly. async_std::task::block_on(async move { loop { diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index 0de54f0a006..5b8e05a09bb 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -92,7 +92,7 @@ pub struct Handler { interval: Duration, } -/// Event produced by the `IdentifyHandler`. +/// Event produced by the `Handler`. #[derive(Debug)] #[allow(clippy::large_enum_variant)] pub enum Event { @@ -111,7 +111,7 @@ pub enum Event { pub struct Push(pub Info); impl Handler { - /// Creates a new `IdentifyHandler`. + /// Creates a new `Handler`. pub fn new(initial_delay: Duration, interval: Duration, remote_peer_id: PeerId) -> Self { Self { remote_peer_id, From d703ba6893c7b6441c9c491ee0e05c4c61c38a53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Wed, 7 Dec 2022 10:57:14 +0000 Subject: [PATCH 02/14] Identify: start moving I/O from NetworkBehaviour, instead of responding pending replies from NetworkBehaviour, send them back to ConnectionHandler. ConnectionHandler for now just receives them, it's implementation of the responding will come next. --- protocols/identify/src/behaviour.rs | 97 ++++++++--------------------- protocols/identify/src/handler.rs | 42 +++++++++---- 2 files changed, 58 insertions(+), 81 deletions(-) diff --git a/protocols/identify/src/behaviour.rs b/protocols/identify/src/behaviour.rs index 0c31398104f..8771b980196 100644 --- a/protocols/identify/src/behaviour.rs +++ b/protocols/identify/src/behaviour.rs @@ -18,9 +18,8 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::handler::{self, Proto, Push}; +use crate::handler::{self, InEvent, Proto, Reply}; use crate::protocol::{Info, ReplySubstream, UpgradeError}; -use futures::prelude::*; use libp2p_core::{ connection::ConnectionId, multiaddr::Protocol, ConnectedPoint, Multiaddr, PeerId, PublicKey, }; @@ -35,7 +34,6 @@ use std::num::NonZeroUsize; use std::{ collections::{HashMap, HashSet, VecDeque}, iter::FromIterator, - pin::Pin, task::Context, task::Poll, time::Duration, @@ -51,8 +49,8 @@ pub struct Behaviour { config: Config, /// For each peer we're connected to, the observed address to send back to it. connected: HashMap>, - /// Pending replies to send. - pending_replies: VecDeque, + /// Pending requests to respond. + requests: VecDeque, /// Pending events to be emitted when polled. events: VecDeque>, /// Peers to which an active push with current information about @@ -63,18 +61,10 @@ pub struct Behaviour { } /// A pending reply to an inbound identification request. -enum Reply { - /// The reply is queued for sending. - Queued { - peer: PeerId, - io: ReplySubstream, - observed: Multiaddr, - }, - /// The reply is being sent. - Sending { - peer: PeerId, - io: Pin> + Send>>, - }, +struct Request { + peer: PeerId, + io: ReplySubstream, + observed: Multiaddr, } /// Configuration for the [`identify::Behaviour`](Behaviour). @@ -184,7 +174,7 @@ impl Behaviour { Self { config, connected: HashMap::new(), - pending_replies: VecDeque::new(), + requests: VecDeque::new(), events: VecDeque::new(), pending_push: HashSet::new(), discovered_peers, @@ -287,7 +277,7 @@ impl NetworkBehaviour for Behaviour { with an established connection and calling `NetworkBehaviour::on_event` \ with `FromSwarm::ConnectionEstablished ensures there is an entry; qed", ); - self.pending_replies.push_back(Reply::Queued { + self.requests.push_back(Request { peer: peer_id, io: sender, observed: observed.clone(), @@ -305,7 +295,7 @@ impl NetworkBehaviour for Behaviour { fn poll( &mut self, - cx: &mut Context<'_>, + _cx: &mut Context<'_>, params: &mut impl PollParameters, ) -> Poll> { if let Some(event) = self.events.pop_front() { @@ -333,7 +323,7 @@ impl NetworkBehaviour for Behaviour { observed_addr, }; - (*peer, Push(info)) + (*peer, InEvent::Push(info)) }) }); @@ -346,55 +336,21 @@ impl NetworkBehaviour for Behaviour { }); } - // Check for pending replies to send. - if let Some(r) = self.pending_replies.pop_front() { - let mut sending = 0; - let to_send = self.pending_replies.len() + 1; - let mut reply = Some(r); - loop { - match reply { - Some(Reply::Queued { peer, io, observed }) => { - let info = Info { - listen_addrs: listen_addrs(params), - protocols: supported_protocols(params), - public_key: self.config.local_public_key.clone(), - protocol_version: self.config.protocol_version.clone(), - agent_version: self.config.agent_version.clone(), - observed_addr: observed, - }; - let io = Box::pin(io.send(info)); - reply = Some(Reply::Sending { peer, io }); - } - Some(Reply::Sending { peer, mut io }) => { - sending += 1; - match Future::poll(Pin::new(&mut io), cx) { - Poll::Ready(Ok(())) => { - let event = Event::Sent { peer_id: peer }; - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); - } - Poll::Pending => { - self.pending_replies.push_back(Reply::Sending { peer, io }); - if sending == to_send { - // All remaining futures are NotReady - break; - } else { - reply = self.pending_replies.pop_front(); - } - } - Poll::Ready(Err(err)) => { - let event = Event::Error { - peer_id: peer, - error: ConnectionHandlerUpgrErr::Upgrade( - libp2p_core::upgrade::UpgradeError::Apply(err), - ), - }; - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); - } - } - } - None => unreachable!(), - } - } + // Check for pending requests to send back to the handler for reply. + if let Some(Request { peer, io, observed }) = self.requests.pop_front() { + let info = Info { + listen_addrs: listen_addrs(params), + protocols: supported_protocols(params), + public_key: self.config.local_public_key.clone(), + protocol_version: self.config.protocol_version.clone(), + agent_version: self.config.agent_version.clone(), + observed_addr: observed, + }; + return Poll::Ready(NetworkBehaviourAction::NotifyHandler { + peer_id: peer, + handler: NotifyHandler::Any, + event: InEvent::Identify(Reply { peer, info, io }), + }); } Poll::Pending @@ -557,6 +513,7 @@ impl PeerCache { mod tests { use super::*; use futures::pin_mut; + use futures::prelude::*; use libp2p::mplex::MplexConfig; use libp2p::noise; use libp2p::tcp; diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index 5b8e05a09bb..701e6ce74c2 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -64,6 +64,14 @@ impl IntoConnectionHandler for Proto { } } +/// A reply to an inbound identification request. +#[derive(Debug)] +pub struct Reply { + pub peer: PeerId, + pub io: ReplySubstream, + pub info: Info, +} + /// Protocol handler for sending and receiving identification requests. /// /// Outbound requests are sent periodically. The handler performs expects @@ -106,9 +114,14 @@ pub enum Event { IdentificationError(ConnectionHandlerUpgrErr), } -/// Identifying information of the local node that is pushed to a remote. #[derive(Debug)] -pub struct Push(pub Info); +#[allow(clippy::large_enum_variant)] +pub enum InEvent { + /// Identifying information of the local node that is pushed to a remote. + Push(Info), + /// Identifying information requested from this node. + Identify(Reply), +} impl Handler { /// Creates a new `Handler`. @@ -195,7 +208,7 @@ impl Handler { } impl ConnectionHandler for Handler { - type InEvent = Push; + type InEvent = InEvent; type OutEvent = Event; type Error = io::Error; type InboundProtocol = SelectUpgrade>; @@ -207,14 +220,21 @@ impl ConnectionHandler for Handler { SubstreamProtocol::new(SelectUpgrade::new(Protocol, PushProtocol::inbound()), ()) } - fn on_behaviour_event(&mut self, Push(push): Self::InEvent) { - self.events - .push(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new( - EitherUpgrade::B(PushProtocol::outbound(push)), - (), - ), - }); + fn on_behaviour_event(&mut self, event: Self::InEvent) { + match event { + InEvent::Push(push) => { + self.events + .push(ConnectionHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new( + EitherUpgrade::B(PushProtocol::outbound(push)), + (), + ), + }); + } + InEvent::Identify(_) => { + todo!() + } + } } fn connection_keep_alive(&self) -> KeepAlive { From 3db1ec11e806de8f502fd71a88693cc81c9de534 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Wed, 7 Dec 2022 13:39:55 +0000 Subject: [PATCH 03/14] identify: Move I/O from Networkehaviour, reply to the Identification requests from the ConnectionHandler. --- protocols/identify/src/behaviour.rs | 6 +++ protocols/identify/src/handler.rs | 64 +++++++++++++++++++++++++++-- 2 files changed, 67 insertions(+), 3 deletions(-) diff --git a/protocols/identify/src/behaviour.rs b/protocols/identify/src/behaviour.rs index 8771b980196..04d1eb81474 100644 --- a/protocols/identify/src/behaviour.rs +++ b/protocols/identify/src/behaviour.rs @@ -261,6 +261,12 @@ impl NetworkBehaviour for Behaviour { score: AddressScore::Finite(1), }); } + handler::Event::Identification(peer) => { + self.events + .push_back(NetworkBehaviourAction::GenerateEvent(Event::Sent { + peer_id: peer, + })); + } handler::Event::IdentificationPushed => { self.events .push_back(NetworkBehaviourAction::GenerateEvent(Event::Pushed { diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index 701e6ce74c2..6f0ff28bdc0 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -36,6 +36,7 @@ use libp2p_swarm::{ }; use log::warn; use smallvec::SmallVec; +use std::collections::VecDeque; use std::{io, pin::Pin, task::Context, task::Poll, time::Duration}; pub struct Proto { @@ -64,6 +65,17 @@ impl IntoConnectionHandler for Proto { } } +/// A pending reply to an inbound identification request. +enum Pending { + /// The reply is queued for sending. + Queued(Reply), + /// The reply is being sent. + Sending { + peer: PeerId, + io: Pin> + Send>>, + }, +} + /// A reply to an inbound identification request. #[derive(Debug)] pub struct Reply { @@ -90,6 +102,9 @@ pub struct Handler { >; 4], >, + /// Pending replies to send. + pending_replies: VecDeque, + /// Future that fires when we need to identify the node again. trigger_next_identify: Delay, @@ -106,11 +121,13 @@ pub struct Handler { pub enum Event { /// We obtained identification information from the remote. Identified(Info), + /// We replied to an identification request from the remote. + Identification(PeerId), /// We actively pushed our identification information to the remote. IdentificationPushed, /// We received a request for identification. Identify(ReplySubstream), - /// Failed to identify the remote. + /// Failed to identify the remote, or to reply to an identification request. IdentificationError(ConnectionHandlerUpgrErr), } @@ -130,6 +147,7 @@ impl Handler { remote_peer_id, inbound_identify_push: Default::default(), events: SmallVec::new(), + pending_replies: VecDeque::new(), trigger_next_identify: Delay::new(initial_delay), keep_alive: KeepAlive::Yes, interval, @@ -231,8 +249,15 @@ impl ConnectionHandler for Handler { ), }); } - InEvent::Identify(_) => { - todo!() + InEvent::Identify(reply) => { + if !self.pending_replies.is_empty() { + warn!( + "New inbound identify request from {} while a previous one \ + is still pending. Queueing the new one.", + reply.peer, + ); + } + self.pending_replies.push_back(Pending::Queued(reply)); } } } @@ -275,6 +300,39 @@ impl ConnectionHandler for Handler { } } + // Check for pending replies to send. + if let Some(mut pending) = self.pending_replies.pop_front() { + loop { + match pending { + Pending::Queued(Reply { peer, io, info }) => { + let io = Box::pin(io.send(info)); + pending = Pending::Sending { peer, io }; + } + Pending::Sending { peer, mut io } => { + match Future::poll(Pin::new(&mut io), cx) { + Poll::Pending => { + self.pending_replies + .push_front(Pending::Sending { peer, io }); + return Poll::Pending; + } + Poll::Ready(Ok(())) => { + return Poll::Ready(ConnectionHandlerEvent::Custom( + Event::Identification(peer), + )); + } + Poll::Ready(Err(err)) => { + return Poll::Ready(ConnectionHandlerEvent::Custom( + Event::IdentificationError(ConnectionHandlerUpgrErr::Upgrade( + libp2p_core::upgrade::UpgradeError::Apply(err), + )), + )) + } + } + } + } + } + } + Poll::Pending } From e01c02e5ae1017435d464df274bace22460b3834 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Thu, 8 Dec 2022 17:59:01 +0000 Subject: [PATCH 04/14] review: Don't bubble the Substream from the handler to the behaviour, instead keep track of the info as it doesn't change. --- protocols/identify/src/behaviour.rs | 15 ++-- protocols/identify/src/handler.rs | 105 +++++++++++++++------------- 2 files changed, 63 insertions(+), 57 deletions(-) diff --git a/protocols/identify/src/behaviour.rs b/protocols/identify/src/behaviour.rs index 04d1eb81474..2d2445f95cf 100644 --- a/protocols/identify/src/behaviour.rs +++ b/protocols/identify/src/behaviour.rs @@ -18,16 +18,15 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::handler::{self, InEvent, Proto, Reply}; -use crate::protocol::{Info, ReplySubstream, UpgradeError}; +use crate::handler::{self, InEvent, Proto}; +use crate::protocol::{Info, UpgradeError}; use libp2p_core::{ connection::ConnectionId, multiaddr::Protocol, ConnectedPoint, Multiaddr, PeerId, PublicKey, }; use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}; use libp2p_swarm::{ dial_opts::DialOpts, AddressScore, ConnectionHandler, ConnectionHandlerUpgrErr, DialError, - IntoConnectionHandler, NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, - NotifyHandler, PollParameters, + IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, }; use lru::LruCache; use std::num::NonZeroUsize; @@ -63,7 +62,6 @@ pub struct Behaviour { /// A pending reply to an inbound identification request. struct Request { peer: PeerId, - io: ReplySubstream, observed: Multiaddr, } @@ -273,7 +271,7 @@ impl NetworkBehaviour for Behaviour { peer_id, })); } - handler::Event::Identify(sender) => { + handler::Event::Identify => { let observed = self .connected .get(&peer_id) @@ -285,7 +283,6 @@ impl NetworkBehaviour for Behaviour { ); self.requests.push_back(Request { peer: peer_id, - io: sender, observed: observed.clone(), }); } @@ -343,7 +340,7 @@ impl NetworkBehaviour for Behaviour { } // Check for pending requests to send back to the handler for reply. - if let Some(Request { peer, io, observed }) = self.requests.pop_front() { + if let Some(Request { peer, observed }) = self.requests.pop_front() { let info = Info { listen_addrs: listen_addrs(params), protocols: supported_protocols(params), @@ -355,7 +352,7 @@ impl NetworkBehaviour for Behaviour { return Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id: peer, handler: NotifyHandler::Any, - event: InEvent::Identify(Reply { peer, info, io }), + event: InEvent::Identify(info), }); } diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index 6f0ff28bdc0..63b8e58261d 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -68,7 +68,7 @@ impl IntoConnectionHandler for Proto { /// A pending reply to an inbound identification request. enum Pending { /// The reply is queued for sending. - Queued(Reply), + Queued(ReplySubstream), /// The reply is being sent. Sending { peer: PeerId, @@ -76,14 +76,6 @@ enum Pending { }, } -/// A reply to an inbound identification request. -#[derive(Debug)] -pub struct Reply { - pub peer: PeerId, - pub io: ReplySubstream, - pub info: Info, -} - /// Protocol handler for sending and receiving identification requests. /// /// Outbound requests are sent periodically. The handler performs expects @@ -102,6 +94,9 @@ pub struct Handler { >; 4], >, + /// Identify request information. + info: Option, + /// Pending replies to send. pending_replies: VecDeque, @@ -126,7 +121,7 @@ pub enum Event { /// We actively pushed our identification information to the remote. IdentificationPushed, /// We received a request for identification. - Identify(ReplySubstream), + Identify, /// Failed to identify the remote, or to reply to an identification request. IdentificationError(ConnectionHandlerUpgrErr), } @@ -137,7 +132,7 @@ pub enum InEvent { /// Identifying information of the local node that is pushed to a remote. Push(Info), /// Identifying information requested from this node. - Identify(Reply), + Identify(Info), } impl Handler { @@ -147,6 +142,7 @@ impl Handler { remote_peer_id, inbound_identify_push: Default::default(), events: SmallVec::new(), + info: None, pending_replies: VecDeque::new(), trigger_next_identify: Delay::new(initial_delay), keep_alive: KeepAlive::Yes, @@ -164,9 +160,22 @@ impl Handler { >, ) { match output { - EitherOutput::First(substream) => self - .events - .push(ConnectionHandlerEvent::Custom(Event::Identify(substream))), + EitherOutput::First(substream) => { + // If we already have `Info` we can proceed responding to the Identify request, + // if not, we request `Info` from the behaviour. + if self.info.is_none() { + self.events + .push(ConnectionHandlerEvent::Custom(Event::Identify)); + } + if !self.pending_replies.is_empty() { + warn!( + "New inbound identify request from {} while a previous one \ + is still pending. Queueing the new one.", + self.remote_peer_id, + ); + } + self.pending_replies.push_back(Pending::Queued(substream)); + } EitherOutput::Second(fut) => { if self.inbound_identify_push.replace(fut).is_some() { warn!( @@ -249,15 +258,8 @@ impl ConnectionHandler for Handler { ), }); } - InEvent::Identify(reply) => { - if !self.pending_replies.is_empty() { - warn!( - "New inbound identify request from {} while a previous one \ - is still pending. Queueing the new one.", - reply.peer, - ); - } - self.pending_replies.push_back(Pending::Queued(reply)); + InEvent::Identify(info) => { + self.info = Some(info); } } } @@ -301,31 +303,38 @@ impl ConnectionHandler for Handler { } // Check for pending replies to send. - if let Some(mut pending) = self.pending_replies.pop_front() { - loop { - match pending { - Pending::Queued(Reply { peer, io, info }) => { - let io = Box::pin(io.send(info)); - pending = Pending::Sending { peer, io }; - } - Pending::Sending { peer, mut io } => { - match Future::poll(Pin::new(&mut io), cx) { - Poll::Pending => { - self.pending_replies - .push_front(Pending::Sending { peer, io }); - return Poll::Pending; - } - Poll::Ready(Ok(())) => { - return Poll::Ready(ConnectionHandlerEvent::Custom( - Event::Identification(peer), - )); - } - Poll::Ready(Err(err)) => { - return Poll::Ready(ConnectionHandlerEvent::Custom( - Event::IdentificationError(ConnectionHandlerUpgrErr::Upgrade( - libp2p_core::upgrade::UpgradeError::Apply(err), - )), - )) + if let Some(ref info) = self.info { + if let Some(mut pending) = self.pending_replies.pop_front() { + loop { + match pending { + Pending::Queued(io) => { + let io = Box::pin(io.send(info.clone())); + pending = Pending::Sending { + peer: self.remote_peer_id, + io, + }; + } + Pending::Sending { peer, mut io } => { + match Future::poll(Pin::new(&mut io), cx) { + Poll::Pending => { + self.pending_replies + .push_front(Pending::Sending { peer, io }); + return Poll::Pending; + } + Poll::Ready(Ok(())) => { + return Poll::Ready(ConnectionHandlerEvent::Custom( + Event::Identification(peer), + )); + } + Poll::Ready(Err(err)) => { + return Poll::Ready(ConnectionHandlerEvent::Custom( + Event::IdentificationError( + ConnectionHandlerUpgrErr::Upgrade( + libp2p_core::upgrade::UpgradeError::Apply(err), + ), + ), + )) + } } } } From 11a92545b63e4eba1633e149a0353d438faed0ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Thu, 8 Dec 2022 18:00:44 +0000 Subject: [PATCH 05/14] review: improve Request doc wording. --- protocols/identify/src/behaviour.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocols/identify/src/behaviour.rs b/protocols/identify/src/behaviour.rs index 2d2445f95cf..5e84d1118d3 100644 --- a/protocols/identify/src/behaviour.rs +++ b/protocols/identify/src/behaviour.rs @@ -59,7 +59,7 @@ pub struct Behaviour { discovered_peers: PeerCache, } -/// A pending reply to an inbound identification request. +/// An inbound identification request. struct Request { peer: PeerId, observed: Multiaddr, From 57013bb1442c256b622d0c725d2a10d065e8c995 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Fri, 9 Dec 2022 20:34:42 +0000 Subject: [PATCH 06/14] review: separate Info details provided by the behaviour, provided on new_handler constructor Everything that doesn't change over the lifetime. --- protocols/identify/src/behaviour.rs | 46 +++++--------- protocols/identify/src/handler.rs | 99 ++++++++++++++++++++++++----- 2 files changed, 99 insertions(+), 46 deletions(-) diff --git a/protocols/identify/src/behaviour.rs b/protocols/identify/src/behaviour.rs index 5e84d1118d3..2068555e794 100644 --- a/protocols/identify/src/behaviour.rs +++ b/protocols/identify/src/behaviour.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::handler::{self, InEvent, Proto}; +use crate::handler::{self, BehaviourInfo, InEvent, Proto}; use crate::protocol::{Info, UpgradeError}; use libp2p_core::{ connection::ConnectionId, multiaddr::Protocol, ConnectedPoint, Multiaddr, PeerId, PublicKey, @@ -48,8 +48,8 @@ pub struct Behaviour { config: Config, /// For each peer we're connected to, the observed address to send back to it. connected: HashMap>, - /// Pending requests to respond. - requests: VecDeque, + /// Information requests from the handlers to be fullfiled. + requests: VecDeque, /// Pending events to be emitted when polled. events: VecDeque>, /// Peers to which an active push with current information about @@ -59,12 +59,6 @@ pub struct Behaviour { discovered_peers: PeerCache, } -/// An inbound identification request. -struct Request { - peer: PeerId, - observed: Multiaddr, -} - /// Configuration for the [`identify::Behaviour`](Behaviour). #[non_exhaustive] #[derive(Debug, Clone)] @@ -228,13 +222,19 @@ impl NetworkBehaviour for Behaviour { type OutEvent = Event; fn new_handler(&mut self) -> Self::ConnectionHandler { - Proto::new(self.config.initial_delay, self.config.interval) + Proto::new( + self.config.initial_delay, + self.config.interval, + self.config.local_public_key.clone(), + self.config.protocol_version.clone(), + self.config.agent_version.clone(), + ) } fn on_connection_handler_event( &mut self, peer_id: PeerId, - connection: ConnectionId, + _connection: ConnectionId, event: <::Handler as ConnectionHandler>::OutEvent, ) { match event { @@ -272,19 +272,7 @@ impl NetworkBehaviour for Behaviour { })); } handler::Event::Identify => { - let observed = self - .connected - .get(&peer_id) - .and_then(|addrs| addrs.get(&connection)) - .expect( - "`on_connection_handler_event` is only called \ - with an established connection and calling `NetworkBehaviour::on_event` \ - with `FromSwarm::ConnectionEstablished ensures there is an entry; qed", - ); - self.requests.push_back(Request { - peer: peer_id, - observed: observed.clone(), - }); + self.requests.push_back(peer_id); } handler::Event::IdentificationError(error) => { self.events @@ -339,15 +327,11 @@ impl NetworkBehaviour for Behaviour { }); } - // Check for pending requests to send back to the handler for reply. - if let Some(Request { peer, observed }) = self.requests.pop_front() { - let info = Info { + // Check for information requests from the handlers. + if let Some(peer) = self.requests.pop_front() { + let info = BehaviourInfo { listen_addrs: listen_addrs(params), protocols: supported_protocols(params), - public_key: self.config.local_public_key.clone(), - protocol_version: self.config.protocol_version.clone(), - agent_version: self.config.agent_version.clone(), - observed_addr: observed, }; return Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id: peer, diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index 63b8e58261d..0aa51b66ac6 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -26,7 +26,7 @@ use futures::prelude::*; use futures_timer::Delay; use libp2p_core::either::{EitherError, EitherOutput}; use libp2p_core::upgrade::{EitherUpgrade, SelectUpgrade}; -use libp2p_core::{ConnectedPoint, PeerId}; +use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, PublicKey}; use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, }; @@ -42,13 +42,25 @@ use std::{io, pin::Pin, task::Context, task::Poll, time::Duration}; pub struct Proto { initial_delay: Duration, interval: Duration, + public_key: PublicKey, + protocol_version: String, + agent_version: String, } impl Proto { - pub fn new(initial_delay: Duration, interval: Duration) -> Self { + pub fn new( + initial_delay: Duration, + interval: Duration, + public_key: PublicKey, + protocol_version: String, + agent_version: String, + ) -> Self { Proto { initial_delay, interval, + public_key, + protocol_version, + agent_version, } } } @@ -56,8 +68,21 @@ impl Proto { impl IntoConnectionHandler for Proto { type Handler = Handler; - fn into_handler(self, remote_peer_id: &PeerId, _endpoint: &ConnectedPoint) -> Self::Handler { - Handler::new(self.initial_delay, self.interval, *remote_peer_id) + fn into_handler(self, remote_peer_id: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler { + let observed_addr = match endpoint { + ConnectedPoint::Dialer { address, .. } => address, + ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr, + }; + + Handler::new( + self.initial_delay, + self.interval, + *remote_peer_id, + self.public_key, + self.protocol_version, + self.agent_version, + observed_addr.clone(), + ) } fn inbound_protocol(&self) -> ::InboundProtocol { @@ -94,9 +119,6 @@ pub struct Handler { >; 4], >, - /// Identify request information. - info: Option, - /// Pending replies to send. pending_replies: VecDeque, @@ -108,6 +130,33 @@ pub struct Handler { /// The interval of `trigger_next_identify`, i.e. the recurrent delay. interval: Duration, + + /// The public key of the local peer. + public_key: PublicKey, + + /// Application-specific version of the protocol family used by the peer, + /// e.g. `ipfs/1.0.0` or `polkadot/1.0.0`. + protocol_version: String, + + /// Name and version of the peer, similar to the `User-Agent` header in + /// the HTTP protocol. + agent_version: String, + + /// Address observed by or for the remote. + observed_addr: Multiaddr, + + /// Information provided by the `Behaviour` upon requesting. + behaviour_info: Option, +} + +/// Information provided by the `Behaviour` upon requesting. +#[derive(Debug)] +pub struct BehaviourInfo { + /// The addresses that the peer is listening on. + pub listen_addrs: Vec, + + /// The list of protocols supported by the peer, e.g. `/ipfs/ping/1.0.0`. + pub protocols: Vec, } /// Event produced by the `Handler`. @@ -132,21 +181,33 @@ pub enum InEvent { /// Identifying information of the local node that is pushed to a remote. Push(Info), /// Identifying information requested from this node. - Identify(Info), + Identify(BehaviourInfo), } impl Handler { /// Creates a new `Handler`. - pub fn new(initial_delay: Duration, interval: Duration, remote_peer_id: PeerId) -> Self { + pub fn new( + initial_delay: Duration, + interval: Duration, + remote_peer_id: PeerId, + public_key: PublicKey, + protocol_version: String, + agent_version: String, + observed_addr: Multiaddr, + ) -> Self { Self { remote_peer_id, inbound_identify_push: Default::default(), events: SmallVec::new(), - info: None, pending_replies: VecDeque::new(), trigger_next_identify: Delay::new(initial_delay), keep_alive: KeepAlive::Yes, interval, + public_key, + protocol_version, + agent_version, + observed_addr, + behaviour_info: None, } } @@ -161,9 +222,9 @@ impl Handler { ) { match output { EitherOutput::First(substream) => { - // If we already have `Info` we can proceed responding to the Identify request, - // if not, we request `Info` from the behaviour. - if self.info.is_none() { + // If we already have `BehaviourInfo` we can proceed responding to the Identify request, + // if not, we request it . + if self.behaviour_info.is_none() { self.events .push(ConnectionHandlerEvent::Custom(Event::Identify)); } @@ -259,7 +320,7 @@ impl ConnectionHandler for Handler { }); } InEvent::Identify(info) => { - self.info = Some(info); + self.behaviour_info = Some(info); } } } @@ -303,11 +364,19 @@ impl ConnectionHandler for Handler { } // Check for pending replies to send. - if let Some(ref info) = self.info { + if let Some(ref info) = self.behaviour_info { if let Some(mut pending) = self.pending_replies.pop_front() { loop { match pending { Pending::Queued(io) => { + let info = Info { + public_key: self.public_key.clone(), + protocol_version: self.protocol_version.clone(), + agent_version: self.agent_version.clone(), + listen_addrs: info.listen_addrs.clone(), + protocols: info.protocols.clone(), + observed_addr: self.observed_addr.clone(), + }; let io = Box::pin(io.send(info.clone())); pending = Pending::Sending { peer: self.remote_peer_id, From 0db0c52f308765670bb09fb8cb937650a257acad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Sun, 11 Dec 2022 19:03:33 +0000 Subject: [PATCH 07/14] review: handler, request info on each new substream, instead of caching the behaviour info. --- protocols/identify/src/behaviour.rs | 25 ++++-- protocols/identify/src/handler.rs | 117 ++++++++++++---------------- 2 files changed, 68 insertions(+), 74 deletions(-) diff --git a/protocols/identify/src/behaviour.rs b/protocols/identify/src/behaviour.rs index 2068555e794..7ccd1c0b413 100644 --- a/protocols/identify/src/behaviour.rs +++ b/protocols/identify/src/behaviour.rs @@ -49,7 +49,7 @@ pub struct Behaviour { /// For each peer we're connected to, the observed address to send back to it. connected: HashMap>, /// Information requests from the handlers to be fullfiled. - requests: VecDeque, + requests: VecDeque, /// Pending events to be emitted when polled. events: VecDeque>, /// Peers to which an active push with current information about @@ -59,6 +59,12 @@ pub struct Behaviour { discovered_peers: PeerCache, } +/// A `Handler` request for `BehaviourInfo`. +struct Request { + peer_id: PeerId, + connection_id: ConnectionId, +} + /// Configuration for the [`identify::Behaviour`](Behaviour). #[non_exhaustive] #[derive(Debug, Clone)] @@ -234,7 +240,7 @@ impl NetworkBehaviour for Behaviour { fn on_connection_handler_event( &mut self, peer_id: PeerId, - _connection: ConnectionId, + connection_id: ConnectionId, event: <::Handler as ConnectionHandler>::OutEvent, ) { match event { @@ -272,7 +278,10 @@ impl NetworkBehaviour for Behaviour { })); } handler::Event::Identify => { - self.requests.push_back(peer_id); + self.requests.push_back(Request { + peer_id, + connection_id, + }); } handler::Event::IdentificationError(error) => { self.events @@ -328,14 +337,18 @@ impl NetworkBehaviour for Behaviour { } // Check for information requests from the handlers. - if let Some(peer) = self.requests.pop_front() { + if let Some(Request { + peer_id, + connection_id, + }) = self.requests.pop_front() + { let info = BehaviourInfo { listen_addrs: listen_addrs(params), protocols: supported_protocols(params), }; return Poll::Ready(NetworkBehaviourAction::NotifyHandler { - peer_id: peer, - handler: NotifyHandler::Any, + peer_id, + handler: NotifyHandler::One(connection_id), event: InEvent::Identify(info), }); } diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index 0aa51b66ac6..72f28970e1f 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -90,15 +90,10 @@ impl IntoConnectionHandler for Proto { } } -/// A pending reply to an inbound identification request. -enum Pending { - /// The reply is queued for sending. - Queued(ReplySubstream), - /// The reply is being sent. - Sending { - peer: PeerId, - io: Pin> + Send>>, - }, +/// A reply to an inbound identification request. +struct Sending { + peer: PeerId, + io: Pin> + Send>>, } /// Protocol handler for sending and receiving identification requests. @@ -119,8 +114,11 @@ pub struct Handler { >; 4], >, - /// Pending replies to send. - pending_replies: VecDeque, + /// Streams awaiting `BehaviourInfo` to then send identify requests. + reply_streams: VecDeque>, + + /// Pending identification replies, awaiting being sent. + pending_replies: VecDeque, /// Future that fires when we need to identify the node again. trigger_next_identify: Delay, @@ -144,9 +142,6 @@ pub struct Handler { /// Address observed by or for the remote. observed_addr: Multiaddr, - - /// Information provided by the `Behaviour` upon requesting. - behaviour_info: Option, } /// Information provided by the `Behaviour` upon requesting. @@ -199,6 +194,7 @@ impl Handler { remote_peer_id, inbound_identify_push: Default::default(), events: SmallVec::new(), + reply_streams: VecDeque::new(), pending_replies: VecDeque::new(), trigger_next_identify: Delay::new(initial_delay), keep_alive: KeepAlive::Yes, @@ -207,7 +203,6 @@ impl Handler { protocol_version, agent_version, observed_addr, - behaviour_info: None, } } @@ -222,20 +217,16 @@ impl Handler { ) { match output { EitherOutput::First(substream) => { - // If we already have `BehaviourInfo` we can proceed responding to the Identify request, - // if not, we request it . - if self.behaviour_info.is_none() { - self.events - .push(ConnectionHandlerEvent::Custom(Event::Identify)); - } - if !self.pending_replies.is_empty() { + self.events + .push(ConnectionHandlerEvent::Custom(Event::Identify)); + if !self.reply_streams.is_empty() { warn!( "New inbound identify request from {} while a previous one \ is still pending. Queueing the new one.", self.remote_peer_id, ); } - self.pending_replies.push_back(Pending::Queued(substream)); + self.reply_streams.push_back(substream); } EitherOutput::Second(fut) => { if self.inbound_identify_push.replace(fut).is_some() { @@ -319,8 +310,24 @@ impl ConnectionHandler for Handler { ), }); } - InEvent::Identify(info) => { - self.behaviour_info = Some(info); + InEvent::Identify(behaviour_info) => { + let info = Info { + public_key: self.public_key.clone(), + protocol_version: self.protocol_version.clone(), + agent_version: self.agent_version.clone(), + listen_addrs: behaviour_info.listen_addrs, + protocols: behaviour_info.protocols, + observed_addr: self.observed_addr.clone(), + }; + let substream = self + .reply_streams + .pop_front() + .expect("A BehaviourInfo reply should have a matching substream."); + let io = Box::pin(substream.send(info)); + self.pending_replies.push_back(Sending { + peer: self.remote_peer_id, + io, + }); } } } @@ -364,49 +371,23 @@ impl ConnectionHandler for Handler { } // Check for pending replies to send. - if let Some(ref info) = self.behaviour_info { - if let Some(mut pending) = self.pending_replies.pop_front() { - loop { - match pending { - Pending::Queued(io) => { - let info = Info { - public_key: self.public_key.clone(), - protocol_version: self.protocol_version.clone(), - agent_version: self.agent_version.clone(), - listen_addrs: info.listen_addrs.clone(), - protocols: info.protocols.clone(), - observed_addr: self.observed_addr.clone(), - }; - let io = Box::pin(io.send(info.clone())); - pending = Pending::Sending { - peer: self.remote_peer_id, - io, - }; - } - Pending::Sending { peer, mut io } => { - match Future::poll(Pin::new(&mut io), cx) { - Poll::Pending => { - self.pending_replies - .push_front(Pending::Sending { peer, io }); - return Poll::Pending; - } - Poll::Ready(Ok(())) => { - return Poll::Ready(ConnectionHandlerEvent::Custom( - Event::Identification(peer), - )); - } - Poll::Ready(Err(err)) => { - return Poll::Ready(ConnectionHandlerEvent::Custom( - Event::IdentificationError( - ConnectionHandlerUpgrErr::Upgrade( - libp2p_core::upgrade::UpgradeError::Apply(err), - ), - ), - )) - } - } - } - } + if let Some(mut sending) = self.pending_replies.pop_front() { + match Future::poll(Pin::new(&mut sending.io), cx) { + Poll::Pending => { + self.pending_replies.push_front(sending); + return Poll::Pending; + } + Poll::Ready(Ok(())) => { + return Poll::Ready(ConnectionHandlerEvent::Custom(Event::Identification( + sending.peer, + ))); + } + Poll::Ready(Err(err)) => { + return Poll::Ready(ConnectionHandlerEvent::Custom(Event::IdentificationError( + ConnectionHandlerUpgrErr::Upgrade( + libp2p_core::upgrade::UpgradeError::Apply(err), + ), + ))) } } } From aa0d81ad6282772d41d3004aebcd375ba6dc2623 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Mon, 12 Dec 2022 00:36:34 +0000 Subject: [PATCH 08/14] review: use FuturesUnordered for pending replies. --- protocols/identify/src/handler.rs | 51 +++++++++++-------------------- 1 file changed, 18 insertions(+), 33 deletions(-) diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index 72f28970e1f..8c21cbb49e5 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -23,6 +23,7 @@ use crate::protocol::{ }; use futures::future::BoxFuture; use futures::prelude::*; +use futures::stream::FuturesUnordered; use futures_timer::Delay; use libp2p_core::either::{EitherError, EitherOutput}; use libp2p_core::upgrade::{EitherUpgrade, SelectUpgrade}; @@ -90,12 +91,6 @@ impl IntoConnectionHandler for Proto { } } -/// A reply to an inbound identification request. -struct Sending { - peer: PeerId, - io: Pin> + Send>>, -} - /// Protocol handler for sending and receiving identification requests. /// /// Outbound requests are sent periodically. The handler performs expects @@ -118,7 +113,7 @@ pub struct Handler { reply_streams: VecDeque>, /// Pending identification replies, awaiting being sent. - pending_replies: VecDeque, + pending_replies: FuturesUnordered>>, /// Future that fires when we need to identify the node again. trigger_next_identify: Delay, @@ -195,7 +190,7 @@ impl Handler { inbound_identify_push: Default::default(), events: SmallVec::new(), reply_streams: VecDeque::new(), - pending_replies: VecDeque::new(), + pending_replies: FuturesUnordered::new(), trigger_next_identify: Delay::new(initial_delay), keep_alive: KeepAlive::Yes, interval, @@ -323,11 +318,12 @@ impl ConnectionHandler for Handler { .reply_streams .pop_front() .expect("A BehaviourInfo reply should have a matching substream."); - let io = Box::pin(substream.send(info)); - self.pending_replies.push_back(Sending { - peer: self.remote_peer_id, - io, + let peer = self.remote_peer_id; + let fut = Box::pin(async move { + substream.send(info).await?; + Ok(peer) }); + self.pending_replies.push(fut); } } } @@ -371,28 +367,17 @@ impl ConnectionHandler for Handler { } // Check for pending replies to send. - if let Some(mut sending) = self.pending_replies.pop_front() { - match Future::poll(Pin::new(&mut sending.io), cx) { - Poll::Pending => { - self.pending_replies.push_front(sending); - return Poll::Pending; - } - Poll::Ready(Ok(())) => { - return Poll::Ready(ConnectionHandlerEvent::Custom(Event::Identification( - sending.peer, - ))); - } - Poll::Ready(Err(err)) => { - return Poll::Ready(ConnectionHandlerEvent::Custom(Event::IdentificationError( - ConnectionHandlerUpgrErr::Upgrade( - libp2p_core::upgrade::UpgradeError::Apply(err), - ), - ))) - } - } + match self.pending_replies.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(peer_id))) => Poll::Ready(ConnectionHandlerEvent::Custom( + Event::Identification(peer_id), + )), + Poll::Ready(Some(Err(err))) => Poll::Ready(ConnectionHandlerEvent::Custom( + Event::IdentificationError(ConnectionHandlerUpgrErr::Upgrade( + libp2p_core::upgrade::UpgradeError::Apply(err), + )), + )), + Poll::Ready(None) | Poll::Pending => Poll::Pending, } - - Poll::Pending } fn on_connection_event( From fa05a2ea104bbf373e3a7e6f43a3766c79477196 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Mon, 12 Dec 2022 17:14:50 +0000 Subject: [PATCH 09/14] review: deprecate ReplySubstream. --- protocols/identify/src/handler.rs | 6 ++-- protocols/identify/src/protocol.rs | 44 ++++++++---------------------- 2 files changed, 14 insertions(+), 36 deletions(-) diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index 8c21cbb49e5..e8864a9c4f0 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::protocol::{ - InboundPush, Info, OutboundPush, Protocol, PushProtocol, ReplySubstream, UpgradeError, + self, InboundPush, Info, OutboundPush, Protocol, PushProtocol, UpgradeError, }; use futures::future::BoxFuture; use futures::prelude::*; @@ -110,7 +110,7 @@ pub struct Handler { >, /// Streams awaiting `BehaviourInfo` to then send identify requests. - reply_streams: VecDeque>, + reply_streams: VecDeque, /// Pending identification replies, awaiting being sent. pending_replies: FuturesUnordered>>, @@ -320,7 +320,7 @@ impl ConnectionHandler for Handler { .expect("A BehaviourInfo reply should have a matching substream."); let peer = self.remote_peer_id; let fut = Box::pin(async move { - substream.send(info).await?; + protocol::send(substream, info).await?; Ok(peer) }); self.pending_replies.push(fut); diff --git a/protocols/identify/src/protocol.rs b/protocols/identify/src/protocol.rs index 1a969c6a967..6092785e4b2 100644 --- a/protocols/identify/src/protocol.rs +++ b/protocols/identify/src/protocol.rs @@ -28,7 +28,7 @@ use libp2p_core::{ }; use log::trace; use std::convert::TryFrom; -use std::{fmt, io, iter, pin::Pin}; +use std::{io, iter, pin::Pin}; use thiserror::Error; use void::Void; @@ -79,30 +79,6 @@ pub struct Info { pub observed_addr: Multiaddr, } -/// The substream on which a reply is expected to be sent. -pub struct ReplySubstream { - inner: T, -} - -impl fmt::Debug for ReplySubstream { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_tuple("ReplySubstream").finish() - } -} - -impl ReplySubstream -where - T: AsyncWrite + Unpin, -{ - /// Sends back the requested information on the substream. - /// - /// Consumes the substream, returning a future that resolves - /// when the reply has been sent on the underlying connection. - pub async fn send(self, info: Info) -> Result<(), UpgradeError> { - send(self.inner, info).await.map_err(Into::into) - } -} - impl UpgradeInfo for Protocol { type Info = &'static [u8]; type InfoIter = iter::Once; @@ -113,12 +89,12 @@ impl UpgradeInfo for Protocol { } impl InboundUpgrade for Protocol { - type Output = ReplySubstream; + type Output = C; type Error = UpgradeError; type Future = future::Ready>; fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future { - future::ok(ReplySubstream { inner: socket }) + future::ok(socket) } } @@ -171,7 +147,7 @@ where } } -async fn send(io: T, info: Info) -> Result<(), UpgradeError> +pub(crate) async fn send(io: T, info: Info) -> Result<(), UpgradeError> where T: AsyncWrite + Unpin, { @@ -318,8 +294,9 @@ mod tests { let sender = apply_inbound(socket, Protocol).await.unwrap(); - sender - .send(Info { + send( + sender, + Info { public_key: send_pubkey, protocol_version: "proto_version".to_owned(), agent_version: "agent_version".to_owned(), @@ -329,9 +306,10 @@ mod tests { ], protocols: vec!["proto1".to_string(), "proto2".to_string()], observed_addr: "/ip4/100.101.102.103/tcp/5000".parse().unwrap(), - }) - .await - .unwrap(); + }, + ) + .await + .unwrap(); }); async_std::task::block_on(async move { From 7aab6a647034cdb219f7bba6ac092224a341e095 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Mon, 12 Dec 2022 12:06:16 +0000 Subject: [PATCH 10/14] review: rename Protocol and ProtocolPush to Identify and Push. --- protocols/identify/src/handler.rs | 24 +++++++++--------------- protocols/identify/src/protocol.rs | 28 ++++++++++++++-------------- 2 files changed, 23 insertions(+), 29 deletions(-) diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index e8864a9c4f0..bf52c457312 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -18,9 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::protocol::{ - self, InboundPush, Info, OutboundPush, Protocol, PushProtocol, UpgradeError, -}; +use crate::protocol::{self, Identify, InboundPush, Info, OutboundPush, Push, UpgradeError}; use futures::future::BoxFuture; use futures::prelude::*; use futures::stream::FuturesUnordered; @@ -87,7 +85,7 @@ impl IntoConnectionHandler for Proto { } fn inbound_protocol(&self) -> ::InboundProtocol { - SelectUpgrade::new(Protocol, PushProtocol::inbound()) + SelectUpgrade::new(Identify, Push::inbound()) } } @@ -101,12 +99,8 @@ pub struct Handler { inbound_identify_push: Option>>, /// Pending events to yield. events: SmallVec< - [ConnectionHandlerEvent< - EitherUpgrade>, - (), - Event, - io::Error, - >; 4], + [ConnectionHandlerEvent>, (), Event, io::Error>; + 4], >, /// Streams awaiting `BehaviourInfo` to then send identify requests. @@ -285,13 +279,13 @@ impl ConnectionHandler for Handler { type InEvent = InEvent; type OutEvent = Event; type Error = io::Error; - type InboundProtocol = SelectUpgrade>; - type OutboundProtocol = EitherUpgrade>; + type InboundProtocol = SelectUpgrade>; + type OutboundProtocol = EitherUpgrade>; type OutboundOpenInfo = (); type InboundOpenInfo = (); fn listen_protocol(&self) -> SubstreamProtocol { - SubstreamProtocol::new(SelectUpgrade::new(Protocol, PushProtocol::inbound()), ()) + SubstreamProtocol::new(SelectUpgrade::new(Identify, Push::inbound()), ()) } fn on_behaviour_event(&mut self, event: Self::InEvent) { @@ -300,7 +294,7 @@ impl ConnectionHandler for Handler { self.events .push(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new( - EitherUpgrade::B(PushProtocol::outbound(push)), + EitherUpgrade::B(Push::outbound(push)), (), ), }); @@ -348,7 +342,7 @@ impl ConnectionHandler for Handler { Poll::Ready(()) => { self.trigger_next_identify.reset(self.interval); let ev = ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(EitherUpgrade::A(Protocol), ()), + protocol: SubstreamProtocol::new(EitherUpgrade::A(Identify), ()), }; return Poll::Ready(ev); } diff --git a/protocols/identify/src/protocol.rs b/protocols/identify/src/protocol.rs index 6092785e4b2..df0ea2b4fc5 100644 --- a/protocols/identify/src/protocol.rs +++ b/protocols/identify/src/protocol.rs @@ -40,23 +40,23 @@ pub const PUSH_PROTOCOL_NAME: &[u8; 19] = b"/ipfs/id/push/1.0.0"; /// Substream upgrade protocol for `/ipfs/id/1.0.0`. #[derive(Debug, Clone)] -pub struct Protocol; +pub struct Identify; /// Substream upgrade protocol for `/ipfs/id/push/1.0.0`. #[derive(Debug, Clone)] -pub struct PushProtocol(T); +pub struct Push(T); pub struct InboundPush(); pub struct OutboundPush(Info); -impl PushProtocol { +impl Push { pub fn inbound() -> Self { - PushProtocol(InboundPush()) + Push(InboundPush()) } } -impl PushProtocol { +impl Push { pub fn outbound(info: Info) -> Self { - PushProtocol(OutboundPush(info)) + Push(OutboundPush(info)) } } @@ -79,7 +79,7 @@ pub struct Info { pub observed_addr: Multiaddr, } -impl UpgradeInfo for Protocol { +impl UpgradeInfo for Identify { type Info = &'static [u8]; type InfoIter = iter::Once; @@ -88,7 +88,7 @@ impl UpgradeInfo for Protocol { } } -impl InboundUpgrade for Protocol { +impl InboundUpgrade for Identify { type Output = C; type Error = UpgradeError; type Future = future::Ready>; @@ -98,7 +98,7 @@ impl InboundUpgrade for Protocol { } } -impl OutboundUpgrade for Protocol +impl OutboundUpgrade for Identify where C: AsyncRead + AsyncWrite + Unpin + Send + 'static, { @@ -111,7 +111,7 @@ where } } -impl UpgradeInfo for PushProtocol { +impl UpgradeInfo for Push { type Info = &'static [u8]; type InfoIter = iter::Once; @@ -120,7 +120,7 @@ impl UpgradeInfo for PushProtocol { } } -impl InboundUpgrade for PushProtocol +impl InboundUpgrade for Push where C: AsyncRead + AsyncWrite + Unpin + Send + 'static, { @@ -134,7 +134,7 @@ where } } -impl OutboundUpgrade for PushProtocol +impl OutboundUpgrade for Push where C: AsyncWrite + Unpin + Send + 'static, { @@ -292,7 +292,7 @@ mod tests { .await .unwrap(); - let sender = apply_inbound(socket, Protocol).await.unwrap(); + let sender = apply_inbound(socket, Identify).await.unwrap(); send( sender, @@ -316,7 +316,7 @@ mod tests { let mut transport = tcp::async_io::Transport::default(); let socket = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); - let info = apply_outbound(socket, Protocol, upgrade::Version::V1) + let info = apply_outbound(socket, Identify, upgrade::Version::V1) .await .unwrap(); assert_eq!( From 963f11dd9b923313ff0b523bf60c1e606ec8d6c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Mon, 12 Dec 2022 12:46:42 +0000 Subject: [PATCH 11/14] review: merge BehaviorInfo and InEvent, into a single struct with the corresponding Substream protocol. --- protocols/identify/src/behaviour.rs | 51 +++++++++++--------------- protocols/identify/src/handler.rs | 56 +++++++++++++++-------------- protocols/identify/src/protocol.rs | 7 ++++ 3 files changed, 57 insertions(+), 57 deletions(-) diff --git a/protocols/identify/src/behaviour.rs b/protocols/identify/src/behaviour.rs index 7ccd1c0b413..5cb9b849a52 100644 --- a/protocols/identify/src/behaviour.rs +++ b/protocols/identify/src/behaviour.rs @@ -18,10 +18,10 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::handler::{self, BehaviourInfo, InEvent, Proto}; -use crate::protocol::{Info, UpgradeError}; +use crate::handler::{self, InEvent, Proto}; +use crate::protocol::{Info, Protocol, UpgradeError}; use libp2p_core::{ - connection::ConnectionId, multiaddr::Protocol, ConnectedPoint, Multiaddr, PeerId, PublicKey, + connection::ConnectionId, multiaddr, ConnectedPoint, Multiaddr, PeerId, PublicKey, }; use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}; use libp2p_swarm::{ @@ -304,26 +304,15 @@ impl NetworkBehaviour for Behaviour { // Check for a pending active push to perform. let peer_push = self.pending_push.iter().find_map(|peer| { - self.connected.get(peer).map(|conns| { - let observed_addr = conns - .values() - .next() - .expect("connected peer has a connection") - .clone(); - - let listen_addrs = listen_addrs(params); - let protocols = supported_protocols(params); - - let info = Info { - public_key: self.config.local_public_key.clone(), - protocol_version: self.config.protocol_version.clone(), - agent_version: self.config.agent_version.clone(), - listen_addrs, - protocols, - observed_addr, - }; - - (*peer, InEvent::Push(info)) + self.connected.get(peer).map(|_| { + ( + *peer, + InEvent { + listen_addrs: listen_addrs(params), + supported_protocols: supported_protocols(params), + protocol: Protocol::Push, + }, + ) }) }); @@ -342,14 +331,14 @@ impl NetworkBehaviour for Behaviour { connection_id, }) = self.requests.pop_front() { - let info = BehaviourInfo { - listen_addrs: listen_addrs(params), - protocols: supported_protocols(params), - }; return Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler: NotifyHandler::One(connection_id), - event: InEvent::Identify(info), + event: InEvent { + listen_addrs: listen_addrs(params), + supported_protocols: supported_protocols(params), + protocol: Protocol::Identify, + }, }); } @@ -465,7 +454,7 @@ fn listen_addrs(params: &impl PollParameters) -> Vec { /// the given peer_id. If there is no peer_id for the peer in the mutiaddr, this returns true. fn multiaddr_matches_peer_id(addr: &Multiaddr, peer_id: &PeerId) -> bool { let last_component = addr.iter().last(); - if let Some(Protocol::P2p(multi_addr_peer_id)) = last_component { + if let Some(multiaddr::Protocol::P2p(multi_addr_peer_id)) = last_component { return multi_addr_peer_id == *peer_id.as_ref(); } true @@ -792,8 +781,8 @@ mod tests { let addr_without_peer_id: Multiaddr = addr.clone(); let mut addr_with_other_peer_id = addr.clone(); - addr.push(Protocol::P2p(peer_id.into())); - addr_with_other_peer_id.push(Protocol::P2p(other_peer_id.into())); + addr.push(multiaddr::Protocol::P2p(peer_id.into())); + addr_with_other_peer_id.push(multiaddr::Protocol::P2p(other_peer_id.into())); assert!(multiaddr_matches_peer_id(&addr, &peer_id)); assert!(!multiaddr_matches_peer_id( diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index bf52c457312..53b4e15dae4 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -18,7 +18,9 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::protocol::{self, Identify, InboundPush, Info, OutboundPush, Push, UpgradeError}; +use crate::protocol::{ + self, Identify, InboundPush, Info, OutboundPush, Protocol, Push, UpgradeError, +}; use futures::future::BoxFuture; use futures::prelude::*; use futures::stream::FuturesUnordered; @@ -133,14 +135,17 @@ pub struct Handler { observed_addr: Multiaddr, } -/// Information provided by the `Behaviour` upon requesting. +/// An event from `Behaviour` with the information requested by the `Handler`. #[derive(Debug)] -pub struct BehaviourInfo { +pub struct InEvent { /// The addresses that the peer is listening on. pub listen_addrs: Vec, /// The list of protocols supported by the peer, e.g. `/ipfs/ping/1.0.0`. - pub protocols: Vec, + pub supported_protocols: Vec, + + /// The protocol w.r.t. the information requested. + pub protocol: Protocol, } /// Event produced by the `Handler`. @@ -159,15 +164,6 @@ pub enum Event { IdentificationError(ConnectionHandlerUpgrErr), } -#[derive(Debug)] -#[allow(clippy::large_enum_variant)] -pub enum InEvent { - /// Identifying information of the local node that is pushed to a remote. - Push(Info), - /// Identifying information requested from this node. - Identify(BehaviourInfo), -} - impl Handler { /// Creates a new `Handler`. pub fn new( @@ -288,26 +284,34 @@ impl ConnectionHandler for Handler { SubstreamProtocol::new(SelectUpgrade::new(Identify, Push::inbound()), ()) } - fn on_behaviour_event(&mut self, event: Self::InEvent) { - match event { - InEvent::Push(push) => { + fn on_behaviour_event( + &mut self, + InEvent { + listen_addrs, + supported_protocols, + protocol, + }: Self::InEvent, + ) { + let info = Info { + public_key: self.public_key.clone(), + protocol_version: self.protocol_version.clone(), + agent_version: self.agent_version.clone(), + listen_addrs, + protocols: supported_protocols, + observed_addr: self.observed_addr.clone(), + }; + + match protocol { + Protocol::Push => { self.events .push(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new( - EitherUpgrade::B(Push::outbound(push)), + EitherUpgrade::B(Push::outbound(info)), (), ), }); } - InEvent::Identify(behaviour_info) => { - let info = Info { - public_key: self.public_key.clone(), - protocol_version: self.protocol_version.clone(), - agent_version: self.agent_version.clone(), - listen_addrs: behaviour_info.listen_addrs, - protocols: behaviour_info.protocols, - observed_addr: self.observed_addr.clone(), - }; + Protocol::Identify => { let substream = self .reply_streams .pop_front() diff --git a/protocols/identify/src/protocol.rs b/protocols/identify/src/protocol.rs index df0ea2b4fc5..99818a3e794 100644 --- a/protocols/identify/src/protocol.rs +++ b/protocols/identify/src/protocol.rs @@ -38,6 +38,13 @@ pub const PROTOCOL_NAME: &[u8; 14] = b"/ipfs/id/1.0.0"; pub const PUSH_PROTOCOL_NAME: &[u8; 19] = b"/ipfs/id/push/1.0.0"; +/// The type of the Substream protocol. +#[derive(Debug)] +pub enum Protocol { + Identify, + Push, +} + /// Substream upgrade protocol for `/ipfs/id/1.0.0`. #[derive(Debug, Clone)] pub struct Identify; From 8e88bf6f65c927c0cfef21aa85b97afc7c5908f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Mon, 12 Dec 2022 15:42:59 +0000 Subject: [PATCH 12/14] review: merge Behaviour pending_push into requests, to better balance incoming requests and fullfil them by order. --- protocols/identify/src/behaviour.rs | 115 +++++++++++++++------------- protocols/identify/src/handler.rs | 2 +- protocols/identify/src/protocol.rs | 5 +- 3 files changed, 66 insertions(+), 56 deletions(-) diff --git a/protocols/identify/src/behaviour.rs b/protocols/identify/src/behaviour.rs index 5cb9b849a52..98ac26a5e2f 100644 --- a/protocols/identify/src/behaviour.rs +++ b/protocols/identify/src/behaviour.rs @@ -48,21 +48,23 @@ pub struct Behaviour { config: Config, /// For each peer we're connected to, the observed address to send back to it. connected: HashMap>, - /// Information requests from the handlers to be fullfiled. - requests: VecDeque, + /// Pending requests to be fullfiled, either `Handler` requests for `Behaviour` info + /// to address identification requests, or push requests to peers + /// with current information about the local peer. + requests: Vec, /// Pending events to be emitted when polled. events: VecDeque>, - /// Peers to which an active push with current information about - /// the local peer should be sent. - pending_push: HashSet, /// The addresses of all peers that we have discovered. discovered_peers: PeerCache, } -/// A `Handler` request for `BehaviourInfo`. +/// A `Behaviour` request to be fullfiled, either `Handler` requests for `Behaviour` info +/// to address identification requests, or push requests to peers +/// with current information about the local peer. +#[derive(Debug, PartialEq, Eq)] struct Request { peer_id: PeerId, - connection_id: ConnectionId, + protocol: Protocol, } /// Configuration for the [`identify::Behaviour`](Behaviour). @@ -172,9 +174,8 @@ impl Behaviour { Self { config, connected: HashMap::new(), - requests: VecDeque::new(), + requests: Vec::new(), events: VecDeque::new(), - pending_push: HashSet::new(), discovered_peers, } } @@ -185,7 +186,13 @@ impl Behaviour { I: IntoIterator, { for p in peers { - if self.pending_push.insert(p) && !self.connected.contains_key(&p) { + let request = Request { + peer_id: p, + protocol: Protocol::Push, + }; + if !self.requests.contains(&request) { + self.requests.push(request); + let handler = self.new_handler(); self.events.push_back(NetworkBehaviourAction::Dial { opts: DialOpts::peer_id(p).build(), @@ -278,9 +285,9 @@ impl NetworkBehaviour for Behaviour { })); } handler::Event::Identify => { - self.requests.push_back(Request { + self.requests.push(Request { peer_id, - connection_id, + protocol: Protocol::Identify(connection_id), }); } handler::Event::IdentificationError(error) => { @@ -302,47 +309,34 @@ impl NetworkBehaviour for Behaviour { return Poll::Ready(event); } - // Check for a pending active push to perform. - let peer_push = self.pending_push.iter().find_map(|peer| { - self.connected.get(peer).map(|_| { - ( - *peer, - InEvent { - listen_addrs: listen_addrs(params), - supported_protocols: supported_protocols(params), - protocol: Protocol::Push, - }, - ) - }) - }); - - if let Some((peer_id, push)) = peer_push { - self.pending_push.remove(&peer_id); - return Poll::Ready(NetworkBehaviourAction::NotifyHandler { + // Check for pending requests. + match self.requests.pop() { + Some(Request { + peer_id, + protocol: Protocol::Push, + }) => Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, - event: push, handler: NotifyHandler::Any, - }); - } - - // Check for information requests from the handlers. - if let Some(Request { - peer_id, - connection_id, - }) = self.requests.pop_front() - { - return Poll::Ready(NetworkBehaviourAction::NotifyHandler { + event: InEvent { + listen_addrs: listen_addrs(params), + supported_protocols: supported_protocols(params), + protocol: Protocol::Push, + }, + }), + Some(Request { + peer_id, + protocol: Protocol::Identify(connection_id), + }) => Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler: NotifyHandler::One(connection_id), event: InEvent { listen_addrs: listen_addrs(params), supported_protocols: supported_protocols(params), - protocol: Protocol::Identify, + protocol: Protocol::Identify(connection_id), }, - }); + }), + None => Poll::Pending, } - - Poll::Pending } fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec { @@ -362,7 +356,13 @@ impl NetworkBehaviour for Behaviour { }) => { if remaining_established == 0 { self.connected.remove(&peer_id); - self.pending_push.remove(&peer_id); + self.requests.retain(|request| { + request + != &Request { + peer_id, + protocol: Protocol::Push, + } + }); } else if let Some(addrs) = self.connected.get_mut(&peer_id) { addrs.remove(&connection_id); } @@ -370,7 +370,13 @@ impl NetworkBehaviour for Behaviour { FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => { if let Some(peer_id) = peer_id { if !self.connected.contains_key(&peer_id) { - self.pending_push.remove(&peer_id); + self.requests.retain(|request| { + request + != &Request { + peer_id, + protocol: Protocol::Push, + } + }); } } @@ -382,14 +388,17 @@ impl NetworkBehaviour for Behaviour { } } } - FromSwarm::NewListenAddr(_) => { + FromSwarm::NewListenAddr(_) | FromSwarm::ExpiredListenAddr(_) => { if self.config.push_listen_addr_updates { - self.pending_push.extend(self.connected.keys()); - } - } - FromSwarm::ExpiredListenAddr(_) => { - if self.config.push_listen_addr_updates { - self.pending_push.extend(self.connected.keys()); + for p in self.connected.keys() { + let request = Request { + peer_id: *p, + protocol: Protocol::Push, + }; + if !self.requests.contains(&request) { + self.requests.push(request); + } + } } } FromSwarm::AddressChange(_) diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index 53b4e15dae4..21063acc661 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -311,7 +311,7 @@ impl ConnectionHandler for Handler { ), }); } - Protocol::Identify => { + Protocol::Identify(_) => { let substream = self .reply_streams .pop_front() diff --git a/protocols/identify/src/protocol.rs b/protocols/identify/src/protocol.rs index 99818a3e794..d01cabde173 100644 --- a/protocols/identify/src/protocol.rs +++ b/protocols/identify/src/protocol.rs @@ -22,6 +22,7 @@ use crate::structs_proto; use asynchronous_codec::{FramedRead, FramedWrite}; use futures::{future::BoxFuture, prelude::*}; use libp2p_core::{ + connection::ConnectionId, identity, multiaddr, upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}, Multiaddr, PublicKey, @@ -39,9 +40,9 @@ pub const PROTOCOL_NAME: &[u8; 14] = b"/ipfs/id/1.0.0"; pub const PUSH_PROTOCOL_NAME: &[u8; 19] = b"/ipfs/id/push/1.0.0"; /// The type of the Substream protocol. -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq)] pub enum Protocol { - Identify, + Identify(ConnectionId), Push, } From 60470d2ef3a10afbd212350a924dce90deb3402e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Tue, 13 Dec 2022 00:38:25 +0000 Subject: [PATCH 13/14] review: fix doc typos. --- protocols/identify/src/behaviour.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/protocols/identify/src/behaviour.rs b/protocols/identify/src/behaviour.rs index 133ce25860e..f37abfc0da7 100644 --- a/protocols/identify/src/behaviour.rs +++ b/protocols/identify/src/behaviour.rs @@ -48,7 +48,7 @@ pub struct Behaviour { config: Config, /// For each peer we're connected to, the observed address to send back to it. connected: HashMap>, - /// Pending requests to be fullfiled, either `Handler` requests for `Behaviour` info + /// Pending requests to be fulfilled, either `Handler` requests for `Behaviour` info /// to address identification requests, or push requests to peers /// with current information about the local peer. requests: Vec, @@ -58,7 +58,7 @@ pub struct Behaviour { discovered_peers: PeerCache, } -/// A `Behaviour` request to be fullfiled, either `Handler` requests for `Behaviour` info +/// A `Behaviour` request to be fulfilled, either `Handler` requests for `Behaviour` info /// to address identification requests, or push requests to peers /// with current information about the local peer. #[derive(Debug, PartialEq, Eq)] From d457cbd3f9ecf34f5ce5ec13ede698886add3443 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Tue, 13 Dec 2022 18:43:52 +0000 Subject: [PATCH 14/14] review: add CHANGELOG.md entry --- protocols/identify/CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/protocols/identify/CHANGELOG.md b/protocols/identify/CHANGELOG.md index f85477d8434..61032f9b6a6 100644 --- a/protocols/identify/CHANGELOG.md +++ b/protocols/identify/CHANGELOG.md @@ -1,7 +1,12 @@ # 0.42.0 [unreleased] +- Move I/O from `Behaviour` to `Handler`. Handle `Behaviour`'s Identify and Push requests independently by incoming order, + previously Push requests were prioritized. see [PR 3208]. + - Update to `libp2p-swarm` `v0.42.0`. +[PR 3208]: https://github.com/libp2p/rust-libp2p/pull/3208 + # 0.41.0 - Change default `cache_size` of `Config` to 100. See [PR 2995].