Skip to content

Commit

Permalink
Make Identify assist in peer discovery
Browse files Browse the repository at this point in the history
Resolves libp2p#2216.
  • Loading branch information
thomaseizinger committed Sep 30, 2021
1 parent c13f033 commit 51f71cc
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 8 deletions.
1 change: 1 addition & 0 deletions protocols/identify/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ futures = "0.3.1"
libp2p-core = { version = "0.30.0", path = "../../core", default-features = false }
libp2p-swarm = { version = "0.31.0", path = "../../swarm" }
log = "0.4.1"
lru = "0.6"
prost = "0.8"
smallvec = "1.6.1"
wasm-timer = "0.2"
Expand Down
135 changes: 127 additions & 8 deletions protocols/identify/src/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ use libp2p_swarm::{
NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, ProtocolsHandler,
ProtocolsHandlerUpgrErr,
};
use lru::LruCache;
use std::{
collections::{HashMap, HashSet, VecDeque},
io,
iter::FromIterator,
pin::Pin,
task::Context,
task::Poll,
Expand All @@ -57,6 +59,8 @@ pub struct Identify {
/// Peers to which an active push with current information about
/// the local peer should be sent.
pending_push: HashSet<PeerId>,
/// The addresses of all peers that we have discovered.
discovered_peers: LruCache<PeerId, HashSet<Multiaddr>>,
}

/// A pending reply to an inbound identification request.
Expand Down Expand Up @@ -109,6 +113,10 @@ pub struct IdentifyConfig {
///
/// Disabled by default.
pub push_listen_addr_updates: bool,

/// How many entries of discovered peers to keep before we discard
/// the least-recently used one.
pub cache_size: usize,
}

impl IdentifyConfig {
Expand All @@ -122,6 +130,7 @@ impl IdentifyConfig {
initial_delay: Duration::from_millis(500),
interval: Duration::from_secs(5 * 60),
push_listen_addr_updates: false,
cache_size: 100,
}
}

Expand Down Expand Up @@ -152,17 +161,26 @@ impl IdentifyConfig {
self.push_listen_addr_updates = b;
self
}

/// Configures the size of the LRU cache for discovered peers.
pub fn with_cache_size(mut self, cache_size: usize) -> Self {
self.cache_size = cache_size;
self
}
}

impl Identify {
/// Creates a new `Identify` network behaviour.
pub fn new(config: IdentifyConfig) -> Self {
let discovered_peers = LruCache::new(config.cache_size);

Identify {
config,
connected: HashMap::new(),
pending_replies: VecDeque::new(),
events: VecDeque::new(),
pending_push: HashSet::new(),
discovered_peers,
}
}

Expand Down Expand Up @@ -254,6 +272,16 @@ impl NetworkBehaviour for Identify {
) {
match event {
IdentifyHandlerEvent::Identified(info) => {
match self.discovered_peers.get_mut(&peer_id) {
Some(entry) => {
entry.extend(info.listen_addrs.clone());
}
None => {
self.discovered_peers
.put(peer_id, HashSet::from_iter(info.listen_addrs.clone()));
}
}

let observed = info.observed_addr.clone();
self.events.push_back(NetworkBehaviourAction::GenerateEvent(
IdentifyEvent::Received { peer_id, info },
Expand Down Expand Up @@ -388,6 +416,27 @@ impl NetworkBehaviour for Identify {

Poll::Pending
}

fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
self.discovered_peers
.get(peer)
.cloned()
.map(|addr| Vec::from_iter(addr))
.unwrap_or_default()
}

fn inject_addr_reach_failure(
&mut self,
peer_id: Option<&PeerId>,
addr: &Multiaddr,
_: &dyn std::error::Error,
) {
if let Some(peer) = peer_id {
if let Some(entry) = self.discovered_peers.get_mut(peer) {
entry.remove(addr);
}
}
}
}

/// Event emitted by the `Identify` behaviour.
Expand Down Expand Up @@ -552,11 +601,7 @@ mod tests {

let (mut swarm1, pubkey1) = {
let (pubkey, transport) = transport();
let protocol = Identify::new(
IdentifyConfig::new("a".to_string(), pubkey.clone())
// Delay identification requests so we can test the push protocol.
.with_initial_delay(Duration::from_secs(u32::MAX as u64)),
);
let protocol = Identify::new(IdentifyConfig::new("a".to_string(), pubkey.clone()));
let swarm = Swarm::new(transport, protocol, pubkey.to_peer_id());
(swarm, pubkey)
};
Expand All @@ -565,9 +610,7 @@ mod tests {
let (pubkey, transport) = transport();
let protocol = Identify::new(
IdentifyConfig::new("a".to_string(), pubkey.clone())
.with_agent_version("b".to_string())
// Delay identification requests so we can test the push protocol.
.with_initial_delay(Duration::from_secs(u32::MAX as u64)),
.with_agent_version("b".to_string()),
);
let swarm = Swarm::new(transport, protocol, pubkey.to_peer_id());
(swarm, pubkey)
Expand Down Expand Up @@ -626,4 +669,80 @@ mod tests {
}
})
}

#[test]
fn discover_peer_after_disconnect() {
let _ = env_logger::try_init();

let mut swarm1 = {
let (pubkey, transport) = transport();
let protocol = Identify::new(IdentifyConfig::new("a".to_string(), pubkey.clone()));

Swarm::new(transport, protocol, pubkey.to_peer_id())
};

let mut swarm2 = {
let (pubkey, transport) = transport();
let protocol = Identify::new(
IdentifyConfig::new("a".to_string(), pubkey.clone())
.with_agent_version("b".to_string()),
);

Swarm::new(transport, protocol, pubkey.to_peer_id())
};

let swarm1_peer_id = *swarm1.local_peer_id();

let listener = swarm1
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.unwrap();

let listen_addr = async_std::task::block_on(async {
loop {
match swarm1.select_next_some().await {
SwarmEvent::NewListenAddr {
address,
listener_id,
} if listener_id == listener => return address,
_ => {}
}
}
});

async_std::task::spawn(async move {
loop {
swarm1.next().await;
}
});

swarm2.dial_addr(listen_addr).unwrap();

// wait until we identified
async_std::task::block_on(async {
loop {
if let SwarmEvent::Behaviour(IdentifyEvent::Received { .. }) =
swarm2.select_next_some().await
{
break;
}
}
});

swarm2.disconnect_peer_id(swarm1_peer_id).unwrap();

// we should still be able to dial now!
swarm2.dial(&swarm1_peer_id).unwrap();

let connected_peer = async_std::task::block_on(async {
loop {
if let SwarmEvent::ConnectionEstablished { peer_id, .. } =
swarm2.select_next_some().await
{
break peer_id;
}
}
});

assert_eq!(connected_peer, swarm1_peer_id);
}
}

0 comments on commit 51f71cc

Please sign in to comment.