Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an AddressBook to Swarm #2133

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3205,6 +3205,9 @@ where
NetworkBehaviourAction::CloseConnection { peer_id, connection } => {
NetworkBehaviourAction::CloseConnection { peer_id, connection }
}
NetworkBehaviourAction::ReportPeerAddr { peer_id, address } => {
NetworkBehaviourAction::ReportPeerAddr { peer_id, address }
}
});
}

Expand Down
7 changes: 7 additions & 0 deletions protocols/identify/src/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,13 @@ impl NetworkBehaviour for Identify {
match event {
IdentifyHandlerEvent::Identified(info) => {
let observed = info.observed_addr.clone();
let listen_addresses = info.listen_addrs.clone();

self.events.extend(
listen_addresses
.into_iter()
.map(|address| NetworkBehaviourAction::ReportPeerAddr { peer_id, address })
);
self.events.push_back(
NetworkBehaviourAction::GenerateEvent(
IdentifyEvent::Received {
Expand Down
4 changes: 4 additions & 0 deletions protocols/relay/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@ fn firewalled_src_discover_firewalled_dst_via_kad_and_connect_to_dst_via_routabl
break;
}
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
SwarmEvent::AddressBookUpdated { .. } => {}
e => panic!("{:?}", e),
}
}
Expand All @@ -626,6 +627,7 @@ fn firewalled_src_discover_firewalled_dst_via_kad_and_connect_to_dst_via_routabl
break;
}
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
SwarmEvent::AddressBookUpdated { .. } => {}
e => panic!("{:?}", e),
}
}
Expand Down Expand Up @@ -682,6 +684,7 @@ fn firewalled_src_discover_firewalled_dst_via_kad_and_connect_to_dst_via_routabl
SwarmEvent::Behaviour(CombinedEvent::Kad(KademliaEvent::RoutingUpdated {
..
})) => {}
SwarmEvent::AddressBookUpdated { .. } => {}
e => panic!("{:?}", e),
}
}
Expand All @@ -700,6 +703,7 @@ fn firewalled_src_discover_firewalled_dst_via_kad_and_connect_to_dst_via_routabl
SwarmEvent::Behaviour(CombinedEvent::Kad(KademliaEvent::RoutingUpdated {
..
})) => {}
SwarmEvent::AddressBookUpdated { .. } => {}
e => panic!("{:?}", e),
}
}
Expand Down
4 changes: 3 additions & 1 deletion protocols/request-response/src/throttled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,9 @@ where
| NetworkBehaviourAction::ReportObservedAddr { address, score } =>
NetworkBehaviourAction::ReportObservedAddr { address, score },
| NetworkBehaviourAction::CloseConnection { peer_id, connection } =>
NetworkBehaviourAction::CloseConnection { peer_id, connection }
NetworkBehaviourAction::CloseConnection { peer_id, connection },
| NetworkBehaviourAction::ReportPeerAddr { peer_id, address } =>
NetworkBehaviourAction::ReportPeerAddr { peer_id, address }
};

return Poll::Ready(event)
Expand Down
3 changes: 3 additions & 0 deletions swarm-derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,9 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
std::task::Poll::Ready(#network_behaviour_action::CloseConnection { peer_id, connection }) => {
return std::task::Poll::Ready(#network_behaviour_action::CloseConnection { peer_id, connection });
}
std::task::Poll::Ready(#network_behaviour_action::ReportPeerAddr { peer_id, address }) => {
return std::task::Poll::Ready(#network_behaviour_action::ReportPeerAddr { peer_id, address });
}
std::task::Poll::Pending => break,
}
}
Expand Down
28 changes: 26 additions & 2 deletions swarm/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,24 @@ pub enum NetworkBehaviourAction<TInEvent, TOutEvent> {
score: AddressScore,
},

/// Informs the [`Swarm`] about an address observed for a particular peer.
///
/// The address will be added to the [`Swarm`]s address book and will be
/// taken into account when a new connection needs to be established.
///
/// [`Swarm`]: crate::Swarm
ReportPeerAddr {
/// The peer who's address we learned.
peer_id: PeerId,
/// The address where the peer is supposedly reachable.
address: Multiaddr,
},

// TODO: Include once peer records are merged
// ReportPeerRecord {
//
// },

/// Instructs the `Swarm` to initiate a graceful close of one or all connections
/// with the given peer.
///
Expand Down Expand Up @@ -331,7 +349,10 @@ impl<TInEvent, TOutEvent> NetworkBehaviourAction<TInEvent, TOutEvent> {
NetworkBehaviourAction::ReportObservedAddr { address, score } =>
NetworkBehaviourAction::ReportObservedAddr { address, score },
NetworkBehaviourAction::CloseConnection { peer_id, connection } =>
NetworkBehaviourAction::CloseConnection { peer_id, connection }
NetworkBehaviourAction::CloseConnection { peer_id, connection },
NetworkBehaviourAction::ReportPeerAddr { peer_id, address } => {
NetworkBehaviourAction::ReportPeerAddr { peer_id, address }
}
}
}

Expand All @@ -349,7 +370,10 @@ impl<TInEvent, TOutEvent> NetworkBehaviourAction<TInEvent, TOutEvent> {
NetworkBehaviourAction::ReportObservedAddr { address, score } =>
NetworkBehaviourAction::ReportObservedAddr { address, score },
NetworkBehaviourAction::CloseConnection { peer_id, connection } =>
NetworkBehaviourAction::CloseConnection { peer_id, connection }
NetworkBehaviourAction::CloseConnection { peer_id, connection },
NetworkBehaviourAction::ReportPeerAddr { peer_id, address } => {
NetworkBehaviourAction::ReportPeerAddr { peer_id, address }
}
}
}
}
Expand Down
117 changes: 112 additions & 5 deletions swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ use libp2p_core::{
use registry::{Addresses, AddressIntoIter};
use smallvec::SmallVec;
use std::{error, fmt, io, pin::Pin, task::{Context, Poll}};
use std::collections::HashSet;
use std::collections::{HashSet, HashMap, VecDeque};
use std::num::{NonZeroU32, NonZeroUsize};
use upgrade::UpgradeInfoSend as _;

Expand Down Expand Up @@ -268,6 +268,12 @@ pub enum SwarmEvent<TBvEv, THandleErr> {
/// [`UnreachableAddr`](SwarmEvent::UnreachableAddr) event is reported
/// with `attempts_remaining` equal to 0.
Dialing(PeerId),

/// Our address book was updated.
AddressBookUpdated {
peer_id: PeerId,
address: Multiaddr
}
}

/// Contains the state of the network, plus the way it should behave.
Expand Down Expand Up @@ -309,6 +315,9 @@ where

/// The configured override for substream protocol upgrades, if any.
substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,

/// Addresses of peers that we've learned about so far.
address_book: AddressBook,
}

impl<TBehaviour, TInEvent, TOutEvent, THandler> Unpin for
Expand Down Expand Up @@ -375,9 +384,12 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
}

let self_listening = &self.listened_addrs;
let mut addrs = self.behaviour.addresses_of_peer(peer_id)
.into_iter()
.filter(|a| !self_listening.contains(a));
let from_address_book = self.address_book.addresses_of_peer(peer_id);
let from_behaviour = self.behaviour.addresses_of_peer(peer_id);

let mut addrs = from_address_book
.chain(from_behaviour)
.filter(|address| !self_listening.contains(address));

let result =
if let Some(first) = addrs.next() {
Expand Down Expand Up @@ -741,8 +753,13 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
log::trace!("Condition for new dialing attempt to {:?} not met: {:?}",
peer_id, condition);
let self_listening = &this.listened_addrs;

let from_address_book = this.address_book.addresses_of_peer(&peer_id);
let from_behaviour = this.behaviour.addresses_of_peer(&peer_id);

let addrs = from_address_book.chain(from_behaviour);

if let Some(mut peer) = this.network.peer(peer_id).into_dialing() {
let addrs = this.behaviour.addresses_of_peer(peer.id());
let mut attempt = peer.some_attempt();
for a in addrs {
if !self_listening.contains(&a) {
Expand Down Expand Up @@ -795,11 +812,38 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
}
}
},
Poll::Ready(NetworkBehaviourAction::ReportPeerAddr { peer_id, address }) => {
this.address_book.add_entry(peer_id, address.clone());

return Poll::Ready(SwarmEvent::AddressBookUpdated { peer_id, address })
},
}
}
}
}

/// Internal storage of addresses of other peers known to the [`Swarm`].
#[derive(Debug, Default)]
struct AddressBook {
inner: HashMap<PeerId, VecDeque<Multiaddr>>
}

impl AddressBook {
/// Adds an entry to the address book.
///
/// Newly added entries are assumed to be more likely to be reachable and
/// are hence added to the front. This is in-line with the [`Swarm`]'s
/// expectation of trying to dial addresses in the order of their
/// likelihood to yield a connection.
fn add_entry(&mut self, peer: PeerId, address: Multiaddr) {
self.inner.entry(peer).or_default().push_front(address);
}

fn addresses_of_peer(&self, peer: &PeerId) -> impl Iterator<Item=Multiaddr> {
self.inner.get(peer).cloned().unwrap_or_default().into_iter()
}
}

/// Connection to notify of a pending event.
///
/// The connection IDs out of which to notify one of an event are captured at
Expand Down Expand Up @@ -1093,6 +1137,7 @@ where TBehaviour: NetworkBehaviour,
banned_peers: HashSet::new(),
pending_event: None,
substream_upgrade_protocol_override: self.substream_upgrade_protocol_override,
address_book: AddressBook::default()
}
}
}
Expand Down Expand Up @@ -1565,4 +1610,66 @@ mod tests {
}
}))
}

#[test]
fn reported_addresses_are_used_for_dialling() {
let mut swarm1 = new_test_swarm::<_, ()>(DummyProtocolsHandler {
keep_alive: KeepAlive::Yes
});
let mut swarm2 = new_test_swarm::<_, ()>(DummyProtocolsHandler {
keep_alive: KeepAlive::Yes
});
let swarm1_peer_id = *swarm1.local_peer_id();
let swarm2_peer_id = *swarm2.local_peer_id();
let listen_address = Multiaddr::empty().with(multiaddr::Protocol::Memory(rand::random()));

executor::block_on(async {
swarm1.listen_on(listen_address.clone()).unwrap();

loop {
if let SwarmEvent::NewListenAddr { .. } = swarm1.select_next_some().await {
break
}
}
});
executor::block_on(async {
swarm2.behaviour.inner().next_action = Some(NetworkBehaviourAction::ReportPeerAddr {
peer_id: swarm1_peer_id,
address: listen_address
});
loop {
if let SwarmEvent::AddressBookUpdated { .. } = swarm2.select_next_some().await {
break
}
}
});

swarm2.dial(&swarm1_peer_id).unwrap();

executor::block_on(async {
let mut swarm1_connected = false;
let mut swarm2_connected = false;

while !swarm1_connected || !swarm2_connected {
futures::select! {
event = swarm1.select_next_some() => {
match event {
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == swarm2_peer_id => {
swarm1_connected = true;
}
_ => {}
}
}
event = swarm2.select_next_some() => {
match event {
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == swarm1_peer_id => {
swarm2_connected = true;
}
_ => {}
}
}
}
}
})
}
}