Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: move Identify I/O from NetworkBehaviour to ConnectionHandler #3208

Merged
merged 18 commits into from
Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 96 additions & 141 deletions protocols/identify/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,21 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::handler::{self, Proto, Push};
use crate::protocol::{Info, ReplySubstream, UpgradeError};
use futures::prelude::*;
use crate::handler::{self, InEvent, Proto};
use crate::protocol::{Info, Protocol, UpgradeError};
use libp2p_core::{
connection::ConnectionId, multiaddr::Protocol, ConnectedPoint, Multiaddr, PeerId, PublicKey,
connection::ConnectionId, multiaddr, ConnectedPoint, Multiaddr, PeerId, PublicKey,
};
use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm};
use libp2p_swarm::{
dial_opts::DialOpts, AddressScore, ConnectionHandler, ConnectionHandlerUpgrErr, DialError,
IntoConnectionHandler, NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction,
NotifyHandler, PollParameters,
IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters,
};
use lru::LruCache;
use std::num::NonZeroUsize;
use std::{
collections::{HashMap, HashSet, VecDeque},
iter::FromIterator,
pin::Pin,
task::Context,
task::Poll,
time::Duration,
Expand All @@ -51,30 +48,23 @@ pub struct Behaviour {
config: Config,
/// For each peer we're connected to, the observed address to send back to it.
connected: HashMap<PeerId, HashMap<ConnectionId, Multiaddr>>,
/// Pending replies to send.
pending_replies: VecDeque<Reply>,
/// Pending requests to be fullfiled, either `Handler` requests for `Behaviour` info
/// to address identification requests, or push requests to peers
/// with current information about the local peer.
requests: Vec<Request>,
/// Pending events to be emitted when polled.
events: VecDeque<NetworkBehaviourAction<Event, Proto>>,
/// Peers to which an active push with current information about
/// the local peer should be sent.
pending_push: HashSet<PeerId>,
/// The addresses of all peers that we have discovered.
discovered_peers: PeerCache,
}

/// A pending reply to an inbound identification request.
enum Reply {
/// The reply is queued for sending.
Queued {
peer: PeerId,
io: ReplySubstream<NegotiatedSubstream>,
observed: Multiaddr,
},
/// The reply is being sent.
Sending {
peer: PeerId,
io: Pin<Box<dyn Future<Output = Result<(), UpgradeError>> + Send>>,
},
/// A `Behaviour` request to be fullfiled, either `Handler` requests for `Behaviour` info
/// to address identification requests, or push requests to peers
/// with current information about the local peer.
#[derive(Debug, PartialEq, Eq)]
struct Request {
peer_id: PeerId,
protocol: Protocol,
}

/// Configuration for the [`identify::Behaviour`](Behaviour).
Expand Down Expand Up @@ -184,9 +174,8 @@ impl Behaviour {
Self {
config,
connected: HashMap::new(),
pending_replies: VecDeque::new(),
requests: Vec::new(),
events: VecDeque::new(),
pending_push: HashSet::new(),
discovered_peers,
}
}
Expand All @@ -197,7 +186,13 @@ impl Behaviour {
I: IntoIterator<Item = PeerId>,
{
for p in peers {
if self.pending_push.insert(p) && !self.connected.contains_key(&p) {
let request = Request {
peer_id: p,
protocol: Protocol::Push,
};
if !self.requests.contains(&request) {
self.requests.push(request);

let handler = self.new_handler();
self.events.push_back(NetworkBehaviourAction::Dial {
opts: DialOpts::peer_id(p).build(),
Expand Down Expand Up @@ -240,13 +235,19 @@ impl NetworkBehaviour for Behaviour {
type OutEvent = Event;

fn new_handler(&mut self) -> Self::ConnectionHandler {
Proto::new(self.config.initial_delay, self.config.interval)
Proto::new(
self.config.initial_delay,
self.config.interval,
self.config.local_public_key.clone(),
self.config.protocol_version.clone(),
self.config.agent_version.clone(),
)
}

fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
connection: ConnectionId,
connection_id: ConnectionId,
event: <<Self::ConnectionHandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent,
) {
match event {
Expand All @@ -271,26 +272,22 @@ impl NetworkBehaviour for Behaviour {
score: AddressScore::Finite(1),
});
}
handler::Event::Identification(peer) => {
self.events
.push_back(NetworkBehaviourAction::GenerateEvent(Event::Sent {
peer_id: peer,
}));
}
handler::Event::IdentificationPushed => {
self.events
.push_back(NetworkBehaviourAction::GenerateEvent(Event::Pushed {
peer_id,
}));
}
handler::Event::Identify(sender) => {
let observed = self
.connected
.get(&peer_id)
.and_then(|addrs| addrs.get(&connection))
.expect(
"`on_connection_handler_event` is only called \
with an established connection and calling `NetworkBehaviour::on_event` \
with `FromSwarm::ConnectionEstablished ensures there is an entry; qed",
);
self.pending_replies.push_back(Reply::Queued {
peer: peer_id,
io: sender,
observed: observed.clone(),
handler::Event::Identify => {
self.requests.push(Request {
peer_id,
protocol: Protocol::Identify(connection_id),
});
}
handler::Event::IdentificationError(error) => {
Expand All @@ -305,99 +302,41 @@ impl NetworkBehaviour for Behaviour {

fn poll(
&mut self,
cx: &mut Context<'_>,
_cx: &mut Context<'_>,
params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(event);
}

// Check for a pending active push to perform.
let peer_push = self.pending_push.iter().find_map(|peer| {
self.connected.get(peer).map(|conns| {
let observed_addr = conns
.values()
.next()
.expect("connected peer has a connection")
.clone();

let listen_addrs = listen_addrs(params);
let protocols = supported_protocols(params);

let info = Info {
public_key: self.config.local_public_key.clone(),
protocol_version: self.config.protocol_version.clone(),
agent_version: self.config.agent_version.clone(),
listen_addrs,
protocols,
observed_addr,
};

(*peer, Push(info))
})
});

if let Some((peer_id, push)) = peer_push {
self.pending_push.remove(&peer_id);
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
// Check for pending requests.
match self.requests.pop() {
Some(Request {
peer_id,
protocol: Protocol::Push,
}) => Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id,
event: push,
handler: NotifyHandler::Any,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the curlpit! This will be buggy with > 1 connection per peer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Thomas, addressed.

});
}

// Check for pending replies to send.
if let Some(r) = self.pending_replies.pop_front() {
let mut sending = 0;
let to_send = self.pending_replies.len() + 1;
let mut reply = Some(r);
loop {
match reply {
Some(Reply::Queued { peer, io, observed }) => {
let info = Info {
listen_addrs: listen_addrs(params),
protocols: supported_protocols(params),
public_key: self.config.local_public_key.clone(),
protocol_version: self.config.protocol_version.clone(),
agent_version: self.config.agent_version.clone(),
observed_addr: observed,
};
let io = Box::pin(io.send(info));
reply = Some(Reply::Sending { peer, io });
}
Some(Reply::Sending { peer, mut io }) => {
sending += 1;
match Future::poll(Pin::new(&mut io), cx) {
Poll::Ready(Ok(())) => {
let event = Event::Sent { peer_id: peer };
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
Poll::Pending => {
self.pending_replies.push_back(Reply::Sending { peer, io });
if sending == to_send {
// All remaining futures are NotReady
break;
} else {
reply = self.pending_replies.pop_front();
}
}
Poll::Ready(Err(err)) => {
let event = Event::Error {
peer_id: peer,
error: ConnectionHandlerUpgrErr::Upgrade(
libp2p_core::upgrade::UpgradeError::Apply(err),
),
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
}
}
None => unreachable!(),
}
}
event: InEvent {
listen_addrs: listen_addrs(params),
supported_protocols: supported_protocols(params),
protocol: Protocol::Push,
},
}),
Some(Request {
peer_id,
protocol: Protocol::Identify(connection_id),
}) => Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection_id),
event: InEvent {
listen_addrs: listen_addrs(params),
supported_protocols: supported_protocols(params),
protocol: Protocol::Identify(connection_id),
},
}),
None => Poll::Pending,
}

Poll::Pending
}

fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
Expand All @@ -417,15 +356,27 @@ impl NetworkBehaviour for Behaviour {
}) => {
if remaining_established == 0 {
self.connected.remove(&peer_id);
self.pending_push.remove(&peer_id);
self.requests.retain(|request| {
request
!= &Request {
peer_id,
protocol: Protocol::Push,
}
});
} else if let Some(addrs) = self.connected.get_mut(&peer_id) {
addrs.remove(&connection_id);
}
}
FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => {
if let Some(peer_id) = peer_id {
if !self.connected.contains_key(&peer_id) {
self.pending_push.remove(&peer_id);
self.requests.retain(|request| {
request
!= &Request {
peer_id,
protocol: Protocol::Push,
}
});
}
}

Expand All @@ -437,14 +388,17 @@ impl NetworkBehaviour for Behaviour {
}
}
}
FromSwarm::NewListenAddr(_) => {
if self.config.push_listen_addr_updates {
self.pending_push.extend(self.connected.keys());
}
}
FromSwarm::ExpiredListenAddr(_) => {
FromSwarm::NewListenAddr(_) | FromSwarm::ExpiredListenAddr(_) => {
if self.config.push_listen_addr_updates {
self.pending_push.extend(self.connected.keys());
for p in self.connected.keys() {
let request = Request {
peer_id: *p,
protocol: Protocol::Push,
};
if !self.requests.contains(&request) {
self.requests.push(request);
}
}
}
}
FromSwarm::AddressChange(_)
Expand Down Expand Up @@ -509,7 +463,7 @@ fn listen_addrs(params: &impl PollParameters) -> Vec<Multiaddr> {
/// the given peer_id. If there is no peer_id for the peer in the mutiaddr, this returns true.
fn multiaddr_matches_peer_id(addr: &Multiaddr, peer_id: &PeerId) -> bool {
let last_component = addr.iter().last();
if let Some(Protocol::P2p(multi_addr_peer_id)) = last_component {
if let Some(multiaddr::Protocol::P2p(multi_addr_peer_id)) = last_component {
return multi_addr_peer_id == *peer_id.as_ref();
}
true
Expand Down Expand Up @@ -557,6 +511,7 @@ impl PeerCache {
mod tests {
use super::*;
use futures::pin_mut;
use futures::prelude::*;
use libp2p::mplex::MplexConfig;
use libp2p::noise;
use libp2p::tcp;
Expand Down Expand Up @@ -618,7 +573,7 @@ mod tests {

// nb. Either swarm may receive the `Identified` event first, upon which
// it will permit the connection to be closed, as defined by
// `IdentifyHandler::connection_keep_alive`. Hence the test succeeds if
// `Handler::connection_keep_alive`. Hence the test succeeds if
// either `Identified` event arrives correctly.
async_std::task::block_on(async move {
loop {
Expand Down Expand Up @@ -835,8 +790,8 @@ mod tests {
let addr_without_peer_id: Multiaddr = addr.clone();
let mut addr_with_other_peer_id = addr.clone();

addr.push(Protocol::P2p(peer_id.into()));
addr_with_other_peer_id.push(Protocol::P2p(other_peer_id.into()));
addr.push(multiaddr::Protocol::P2p(peer_id.into()));
addr_with_other_peer_id.push(multiaddr::Protocol::P2p(other_peer_id.into()));

assert!(multiaddr_matches_peer_id(&addr, &peer_id));
assert!(!multiaddr_matches_peer_id(
Expand Down
Loading