Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

protocols/identify: Assist in peer discovery based on reported listen addresses from other peers #2232

Merged
merged 7 commits into from
Oct 13, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions protocols/identify/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ futures = "0.3.1"
libp2p-core = { version = "0.30.0", path = "../../core", default-features = false }
libp2p-swarm = { version = "0.31.0", path = "../../swarm" }
log = "0.4.1"
lru = "0.6"
prost = "0.8"
smallvec = "1.6.1"
wasm-timer = "0.2"
Expand Down
135 changes: 127 additions & 8 deletions protocols/identify/src/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ use libp2p_swarm::{
NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, ProtocolsHandler,
ProtocolsHandlerUpgrErr,
};
use lru::LruCache;
use std::{
collections::{HashMap, HashSet, VecDeque},
io,
iter::FromIterator,
pin::Pin,
task::Context,
task::Poll,
Expand All @@ -57,6 +59,8 @@ pub struct Identify {
/// Peers to which an active push with current information about
/// the local peer should be sent.
pending_push: HashSet<PeerId>,
/// The addresses of all peers that we have discovered.
discovered_peers: LruCache<PeerId, HashSet<Multiaddr>>,
}

/// A pending reply to an inbound identification request.
Expand Down Expand Up @@ -109,6 +113,10 @@ pub struct IdentifyConfig {
///
/// Disabled by default.
pub push_listen_addr_updates: bool,

/// How many entries of discovered peers to keep before we discard
/// the least-recently used one.
pub cache_size: usize,
}

impl IdentifyConfig {
Expand All @@ -122,6 +130,7 @@ impl IdentifyConfig {
initial_delay: Duration::from_millis(500),
interval: Duration::from_secs(5 * 60),
push_listen_addr_updates: false,
cache_size: 100,
}
}

Expand Down Expand Up @@ -152,17 +161,26 @@ impl IdentifyConfig {
self.push_listen_addr_updates = b;
self
}

/// Configures the size of the LRU cache for discovered peers.
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
pub fn with_cache_size(mut self, cache_size: usize) -> Self {
self.cache_size = cache_size;
self
}
}

impl Identify {
/// Creates a new `Identify` network behaviour.
pub fn new(config: IdentifyConfig) -> Self {
let discovered_peers = LruCache::new(config.cache_size);

Identify {
config,
connected: HashMap::new(),
pending_replies: VecDeque::new(),
events: VecDeque::new(),
pending_push: HashSet::new(),
discovered_peers,
}
}

Expand Down Expand Up @@ -254,6 +272,16 @@ impl NetworkBehaviour for Identify {
) {
match event {
IdentifyHandlerEvent::Identified(info) => {
mxinden marked this conversation as resolved.
Show resolved Hide resolved
match self.discovered_peers.get_mut(&peer_id) {
Some(entry) => {
entry.extend(info.listen_addrs.clone());
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
}
None => {
self.discovered_peers
.put(peer_id, HashSet::from_iter(info.listen_addrs.clone()));
}
}

let observed = info.observed_addr.clone();
self.events.push_back(NetworkBehaviourAction::GenerateEvent(
IdentifyEvent::Received { peer_id, info },
Expand Down Expand Up @@ -388,6 +416,27 @@ impl NetworkBehaviour for Identify {

Poll::Pending
}

fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
self.discovered_peers
.get(peer)
.cloned()
.map(|addr| Vec::from_iter(addr))
.unwrap_or_default()
}

fn inject_addr_reach_failure(
&mut self,
peer_id: Option<&PeerId>,
addr: &Multiaddr,
_: &dyn std::error::Error,
) {
if let Some(peer) = peer_id {
if let Some(entry) = self.discovered_peers.get_mut(peer) {
entry.remove(addr);
}
}
}
}

/// Event emitted by the `Identify` behaviour.
Expand Down Expand Up @@ -552,11 +601,7 @@ mod tests {

let (mut swarm1, pubkey1) = {
let (pubkey, transport) = transport();
let protocol = Identify::new(
IdentifyConfig::new("a".to_string(), pubkey.clone())
// Delay identification requests so we can test the push protocol.
.with_initial_delay(Duration::from_secs(u32::MAX as u64)),
);
let protocol = Identify::new(IdentifyConfig::new("a".to_string(), pubkey.clone()));
let swarm = Swarm::new(transport, protocol, pubkey.to_peer_id());
(swarm, pubkey)
};
Expand All @@ -565,9 +610,7 @@ mod tests {
let (pubkey, transport) = transport();
let protocol = Identify::new(
IdentifyConfig::new("a".to_string(), pubkey.clone())
.with_agent_version("b".to_string())
// Delay identification requests so we can test the push protocol.
.with_initial_delay(Duration::from_secs(u32::MAX as u64)),
.with_agent_version("b".to_string()),
);
let swarm = Swarm::new(transport, protocol, pubkey.to_peer_id());
(swarm, pubkey)
Expand Down Expand Up @@ -626,4 +669,80 @@ mod tests {
}
})
}

#[test]
fn discover_peer_after_disconnect() {
let _ = env_logger::try_init();

let mut swarm1 = {
let (pubkey, transport) = transport();
let protocol = Identify::new(IdentifyConfig::new("a".to_string(), pubkey.clone()));

Swarm::new(transport, protocol, pubkey.to_peer_id())
};

let mut swarm2 = {
let (pubkey, transport) = transport();
let protocol = Identify::new(
IdentifyConfig::new("a".to_string(), pubkey.clone())
.with_agent_version("b".to_string()),
);

Swarm::new(transport, protocol, pubkey.to_peer_id())
};

let swarm1_peer_id = *swarm1.local_peer_id();

let listener = swarm1
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.unwrap();

let listen_addr = async_std::task::block_on(async {
loop {
match swarm1.select_next_some().await {
SwarmEvent::NewListenAddr {
address,
listener_id,
} if listener_id == listener => return address,
_ => {}
}
}
});

async_std::task::spawn(async move {
loop {
swarm1.next().await;
}
});

swarm2.dial_addr(listen_addr).unwrap();

// wait until we identified
async_std::task::block_on(async {
loop {
if let SwarmEvent::Behaviour(IdentifyEvent::Received { .. }) =
swarm2.select_next_some().await
{
break;
}
}
});

swarm2.disconnect_peer_id(swarm1_peer_id).unwrap();

// we should still be able to dial now!
swarm2.dial(&swarm1_peer_id).unwrap();

let connected_peer = async_std::task::block_on(async {
loop {
if let SwarmEvent::ConnectionEstablished { peer_id, .. } =
swarm2.select_next_some().await
{
break peer_id;
}
}
});

assert_eq!(connected_peer, swarm1_peer_id);
}
}