Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

protocols/identify: Assist in peer discovery based on reported listen addresses from other peers #2232

Merged
merged 7 commits into from
Oct 13, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions protocols/identify/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

- Update dependencies.

- Assist in peer discovery by returning reported listen addresses
of other peers from `addresses_of_peer`.
[PR 2232](https://github.com/libp2p/rust-libp2p/pull/2232)

# 0.30.0 [2021-07-12]

- Update dependencies.
Expand Down
1 change: 1 addition & 0 deletions protocols/identify/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ futures = "0.3.1"
libp2p-core = { version = "0.30.0", path = "../../core", default-features = false }
libp2p-swarm = { version = "0.31.0", path = "../../swarm" }
log = "0.4.1"
lru = "0.6"
prost = "0.8"
smallvec = "1.6.1"
wasm-timer = "0.2"
Expand Down
132 changes: 124 additions & 8 deletions protocols/identify/src/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ use libp2p_swarm::{
NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, ProtocolsHandler,
ProtocolsHandlerUpgrErr,
};
use lru::LruCache;
use std::{
collections::{HashMap, HashSet, VecDeque},
io,
iter::FromIterator,
pin::Pin,
task::Context,
task::Poll,
Expand All @@ -57,6 +59,8 @@ pub struct Identify {
/// Peers to which an active push with current information about
/// the local peer should be sent.
pending_push: HashSet<PeerId>,
/// The addresses of all peers that we have discovered.
discovered_peers: LruCache<PeerId, HashSet<Multiaddr>>,
}

/// A pending reply to an inbound identification request.
Expand Down Expand Up @@ -109,6 +113,10 @@ pub struct IdentifyConfig {
///
/// Disabled by default.
pub push_listen_addr_updates: bool,

/// How many entries of discovered peers to keep before we discard
/// the least-recently used one.
pub cache_size: usize,
}

impl IdentifyConfig {
Expand All @@ -122,6 +130,7 @@ impl IdentifyConfig {
initial_delay: Duration::from_millis(500),
interval: Duration::from_secs(5 * 60),
push_listen_addr_updates: false,
cache_size: 100,
}
}

Expand Down Expand Up @@ -152,17 +161,29 @@ impl IdentifyConfig {
self.push_listen_addr_updates = b;
self
}

/// Configures the size of the LRU cache, caching addresses of discovered peers.
///
/// The [`Swarm`] may extend the set of addresses of an outgoing connection attempt via
mxinden marked this conversation as resolved.
Show resolved Hide resolved
/// [`<Identify as NetworkBehaviour>::addresses_of_peer`].
mxinden marked this conversation as resolved.
Show resolved Hide resolved
pub fn with_cache_size(mut self, cache_size: usize) -> Self {
self.cache_size = cache_size;
self
}
}

impl Identify {
/// Creates a new `Identify` network behaviour.
pub fn new(config: IdentifyConfig) -> Self {
let discovered_peers = LruCache::new(config.cache_size);

Identify {
config,
connected: HashMap::new(),
pending_replies: VecDeque::new(),
events: VecDeque::new(),
pending_push: HashSet::new(),
discovered_peers,
}
}

Expand Down Expand Up @@ -254,6 +275,10 @@ impl NetworkBehaviour for Identify {
) {
match event {
IdentifyHandlerEvent::Identified(info) => {
mxinden marked this conversation as resolved.
Show resolved Hide resolved
// Replace existing addresses to prevent other peer from filling up our memory.
self.discovered_peers
.put(peer_id, HashSet::from_iter(info.listen_addrs.clone()));

let observed = info.observed_addr.clone();
self.events.push_back(NetworkBehaviourAction::GenerateEvent(
IdentifyEvent::Received { peer_id, info },
Expand Down Expand Up @@ -388,6 +413,27 @@ impl NetworkBehaviour for Identify {

Poll::Pending
}

fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
self.discovered_peers
.get(peer)
.cloned()
.map(|addr| Vec::from_iter(addr))
.unwrap_or_default()
}

fn inject_addr_reach_failure(
&mut self,
peer_id: Option<&PeerId>,
addr: &Multiaddr,
_: &dyn std::error::Error,
) {
if let Some(peer) = peer_id {
if let Some(entry) = self.discovered_peers.get_mut(peer) {
entry.remove(addr);
}
}
}
}

/// Event emitted by the `Identify` behaviour.
Expand Down Expand Up @@ -552,11 +598,7 @@ mod tests {

let (mut swarm1, pubkey1) = {
let (pubkey, transport) = transport();
let protocol = Identify::new(
IdentifyConfig::new("a".to_string(), pubkey.clone())
// Delay identification requests so we can test the push protocol.
.with_initial_delay(Duration::from_secs(u32::MAX as u64)),
);
let protocol = Identify::new(IdentifyConfig::new("a".to_string(), pubkey.clone()));
let swarm = Swarm::new(transport, protocol, pubkey.to_peer_id());
(swarm, pubkey)
};
Expand All @@ -565,9 +607,7 @@ mod tests {
let (pubkey, transport) = transport();
let protocol = Identify::new(
IdentifyConfig::new("a".to_string(), pubkey.clone())
.with_agent_version("b".to_string())
// Delay identification requests so we can test the push protocol.
.with_initial_delay(Duration::from_secs(u32::MAX as u64)),
.with_agent_version("b".to_string()),
);
let swarm = Swarm::new(transport, protocol, pubkey.to_peer_id());
(swarm, pubkey)
Expand Down Expand Up @@ -626,4 +666,80 @@ mod tests {
}
})
}

#[test]
fn discover_peer_after_disconnect() {
let _ = env_logger::try_init();

let mut swarm1 = {
let (pubkey, transport) = transport();
let protocol = Identify::new(IdentifyConfig::new("a".to_string(), pubkey.clone()));

Swarm::new(transport, protocol, pubkey.to_peer_id())
};

let mut swarm2 = {
let (pubkey, transport) = transport();
let protocol = Identify::new(
IdentifyConfig::new("a".to_string(), pubkey.clone())
.with_agent_version("b".to_string()),
);

Swarm::new(transport, protocol, pubkey.to_peer_id())
};

let swarm1_peer_id = *swarm1.local_peer_id();

let listener = swarm1
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.unwrap();

let listen_addr = async_std::task::block_on(async {
loop {
match swarm1.select_next_some().await {
SwarmEvent::NewListenAddr {
address,
listener_id,
} if listener_id == listener => return address,
_ => {}
}
}
});

async_std::task::spawn(async move {
loop {
swarm1.next().await;
}
});

swarm2.dial_addr(listen_addr).unwrap();

// wait until we identified
async_std::task::block_on(async {
loop {
if let SwarmEvent::Behaviour(IdentifyEvent::Received { .. }) =
swarm2.select_next_some().await
{
break;
}
}
});

swarm2.disconnect_peer_id(swarm1_peer_id).unwrap();

// we should still be able to dial now!
swarm2.dial(&swarm1_peer_id).unwrap();

let connected_peer = async_std::task::block_on(async {
loop {
if let SwarmEvent::ConnectionEstablished { peer_id, .. } =
swarm2.select_next_some().await
{
break peer_id;
}
}
});

assert_eq!(connected_peer, swarm1_peer_id);
}
}