diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index eea59d5b161..17e776bea79 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -3205,6 +3205,9 @@ where NetworkBehaviourAction::CloseConnection { peer_id, connection } => { NetworkBehaviourAction::CloseConnection { peer_id, connection } } + NetworkBehaviourAction::ReportPeerAddr { peer_id, address } => { + NetworkBehaviourAction::ReportPeerAddr { peer_id, address } + } }); } diff --git a/protocols/identify/src/identify.rs b/protocols/identify/src/identify.rs index d6bc6abb5bc..42ba8a80ef1 100644 --- a/protocols/identify/src/identify.rs +++ b/protocols/identify/src/identify.rs @@ -255,6 +255,13 @@ impl NetworkBehaviour for Identify { match event { IdentifyHandlerEvent::Identified(info) => { let observed = info.observed_addr.clone(); + let listen_addresses = info.listen_addrs.clone(); + + self.events.extend( + listen_addresses + .into_iter() + .map(|address| NetworkBehaviourAction::ReportPeerAddr { peer_id, address }) + ); self.events.push_back( NetworkBehaviourAction::GenerateEvent( IdentifyEvent::Received { diff --git a/protocols/relay/tests/lib.rs b/protocols/relay/tests/lib.rs index 68c7e96a58e..ee7af484217 100644 --- a/protocols/relay/tests/lib.rs +++ b/protocols/relay/tests/lib.rs @@ -615,6 +615,7 @@ fn firewalled_src_discover_firewalled_dst_via_kad_and_connect_to_dst_via_routabl break; } SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {} + SwarmEvent::AddressBookUpdated { .. } => {} e => panic!("{:?}", e), } } @@ -626,6 +627,7 @@ fn firewalled_src_discover_firewalled_dst_via_kad_and_connect_to_dst_via_routabl break; } SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {} + SwarmEvent::AddressBookUpdated { .. } => {} e => panic!("{:?}", e), } } @@ -682,6 +684,7 @@ fn firewalled_src_discover_firewalled_dst_via_kad_and_connect_to_dst_via_routabl SwarmEvent::Behaviour(CombinedEvent::Kad(KademliaEvent::RoutingUpdated { .. })) => {} + SwarmEvent::AddressBookUpdated { .. } => {} e => panic!("{:?}", e), } } @@ -700,6 +703,7 @@ fn firewalled_src_discover_firewalled_dst_via_kad_and_connect_to_dst_via_routabl SwarmEvent::Behaviour(CombinedEvent::Kad(KademliaEvent::RoutingUpdated { .. })) => {} + SwarmEvent::AddressBookUpdated { .. } => {} e => panic!("{:?}", e), } } diff --git a/protocols/request-response/src/throttled.rs b/protocols/request-response/src/throttled.rs index 611331f4f52..cb639aeed82 100644 --- a/protocols/request-response/src/throttled.rs +++ b/protocols/request-response/src/throttled.rs @@ -659,7 +659,9 @@ where | NetworkBehaviourAction::ReportObservedAddr { address, score } => NetworkBehaviourAction::ReportObservedAddr { address, score }, | NetworkBehaviourAction::CloseConnection { peer_id, connection } => - NetworkBehaviourAction::CloseConnection { peer_id, connection } + NetworkBehaviourAction::CloseConnection { peer_id, connection }, + | NetworkBehaviourAction::ReportPeerAddr { peer_id, address } => + NetworkBehaviourAction::ReportPeerAddr { peer_id, address } }; return Poll::Ready(event) diff --git a/swarm-derive/src/lib.rs b/swarm-derive/src/lib.rs index a5cdf4900ca..715d35c3551 100644 --- a/swarm-derive/src/lib.rs +++ b/swarm-derive/src/lib.rs @@ -482,6 +482,9 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { std::task::Poll::Ready(#network_behaviour_action::CloseConnection { peer_id, connection }) => { return std::task::Poll::Ready(#network_behaviour_action::CloseConnection { peer_id, connection }); } + std::task::Poll::Ready(#network_behaviour_action::ReportPeerAddr { peer_id, address }) => { + return std::task::Poll::Ready(#network_behaviour_action::ReportPeerAddr { peer_id, address }); + } std::task::Poll::Pending => break, } } diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index 432cef9514b..701982117ee 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -294,6 +294,24 @@ pub enum NetworkBehaviourAction { score: AddressScore, }, + /// Informs the [`Swarm`] about an address observed for a particular peer. + /// + /// The address will be added to the [`Swarm`]s address book and will be + /// taken into account when a new connection needs to be established. + /// + /// [`Swarm`]: crate::Swarm + ReportPeerAddr { + /// The peer who's address we learned. + peer_id: PeerId, + /// The address where the peer is supposedly reachable. + address: Multiaddr, + }, + + // TODO: Include once peer records are merged + // ReportPeerRecord { + // + // }, + /// Instructs the `Swarm` to initiate a graceful close of one or all connections /// with the given peer. /// @@ -331,7 +349,10 @@ impl NetworkBehaviourAction { NetworkBehaviourAction::ReportObservedAddr { address, score } => NetworkBehaviourAction::ReportObservedAddr { address, score }, NetworkBehaviourAction::CloseConnection { peer_id, connection } => - NetworkBehaviourAction::CloseConnection { peer_id, connection } + NetworkBehaviourAction::CloseConnection { peer_id, connection }, + NetworkBehaviourAction::ReportPeerAddr { peer_id, address } => { + NetworkBehaviourAction::ReportPeerAddr { peer_id, address } + } } } @@ -349,7 +370,10 @@ impl NetworkBehaviourAction { NetworkBehaviourAction::ReportObservedAddr { address, score } => NetworkBehaviourAction::ReportObservedAddr { address, score }, NetworkBehaviourAction::CloseConnection { peer_id, connection } => - NetworkBehaviourAction::CloseConnection { peer_id, connection } + NetworkBehaviourAction::CloseConnection { peer_id, connection }, + NetworkBehaviourAction::ReportPeerAddr { peer_id, address } => { + NetworkBehaviourAction::ReportPeerAddr { peer_id, address } + } } } } diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index f79ef5a0462..3b1363fd464 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -127,7 +127,7 @@ use libp2p_core::{ use registry::{Addresses, AddressIntoIter}; use smallvec::SmallVec; use std::{error, fmt, io, pin::Pin, task::{Context, Poll}}; -use std::collections::HashSet; +use std::collections::{HashSet, HashMap, VecDeque}; use std::num::{NonZeroU32, NonZeroUsize}; use upgrade::UpgradeInfoSend as _; @@ -268,6 +268,12 @@ pub enum SwarmEvent { /// [`UnreachableAddr`](SwarmEvent::UnreachableAddr) event is reported /// with `attempts_remaining` equal to 0. Dialing(PeerId), + + /// Our address book was updated. + AddressBookUpdated { + peer_id: PeerId, + address: Multiaddr + } } /// Contains the state of the network, plus the way it should behave. @@ -309,6 +315,9 @@ where /// The configured override for substream protocol upgrades, if any. substream_upgrade_protocol_override: Option, + + /// Addresses of peers that we've learned about so far. + address_book: AddressBook, } impl Unpin for @@ -375,9 +384,12 @@ where TBehaviour: NetworkBehaviour, } let self_listening = &self.listened_addrs; - let mut addrs = self.behaviour.addresses_of_peer(peer_id) - .into_iter() - .filter(|a| !self_listening.contains(a)); + let from_address_book = self.address_book.addresses_of_peer(peer_id); + let from_behaviour = self.behaviour.addresses_of_peer(peer_id); + + let mut addrs = from_address_book + .chain(from_behaviour) + .filter(|address| !self_listening.contains(address)); let result = if let Some(first) = addrs.next() { @@ -741,8 +753,13 @@ where TBehaviour: NetworkBehaviour, log::trace!("Condition for new dialing attempt to {:?} not met: {:?}", peer_id, condition); let self_listening = &this.listened_addrs; + + let from_address_book = this.address_book.addresses_of_peer(&peer_id); + let from_behaviour = this.behaviour.addresses_of_peer(&peer_id); + + let addrs = from_address_book.chain(from_behaviour); + if let Some(mut peer) = this.network.peer(peer_id).into_dialing() { - let addrs = this.behaviour.addresses_of_peer(peer.id()); let mut attempt = peer.some_attempt(); for a in addrs { if !self_listening.contains(&a) { @@ -795,11 +812,38 @@ where TBehaviour: NetworkBehaviour, } } }, + Poll::Ready(NetworkBehaviourAction::ReportPeerAddr { peer_id, address }) => { + this.address_book.add_entry(peer_id, address.clone()); + + return Poll::Ready(SwarmEvent::AddressBookUpdated { peer_id, address }) + }, } } } } +/// Internal storage of addresses of other peers known to the [`Swarm`]. +#[derive(Debug, Default)] +struct AddressBook { + inner: HashMap> +} + +impl AddressBook { + /// Adds an entry to the address book. + /// + /// Newly added entries are assumed to be more likely to be reachable and + /// are hence added to the front. This is in-line with the [`Swarm`]'s + /// expectation of trying to dial addresses in the order of their + /// likelihood to yield a connection. + fn add_entry(&mut self, peer: PeerId, address: Multiaddr) { + self.inner.entry(peer).or_default().push_front(address); + } + + fn addresses_of_peer(&self, peer: &PeerId) -> impl Iterator { + self.inner.get(peer).cloned().unwrap_or_default().into_iter() + } +} + /// Connection to notify of a pending event. /// /// The connection IDs out of which to notify one of an event are captured at @@ -1093,6 +1137,7 @@ where TBehaviour: NetworkBehaviour, banned_peers: HashSet::new(), pending_event: None, substream_upgrade_protocol_override: self.substream_upgrade_protocol_override, + address_book: AddressBook::default() } } } @@ -1565,4 +1610,66 @@ mod tests { } })) } + + #[test] + fn reported_addresses_are_used_for_dialling() { + let mut swarm1 = new_test_swarm::<_, ()>(DummyProtocolsHandler { + keep_alive: KeepAlive::Yes + }); + let mut swarm2 = new_test_swarm::<_, ()>(DummyProtocolsHandler { + keep_alive: KeepAlive::Yes + }); + let swarm1_peer_id = *swarm1.local_peer_id(); + let swarm2_peer_id = *swarm2.local_peer_id(); + let listen_address = Multiaddr::empty().with(multiaddr::Protocol::Memory(rand::random())); + + executor::block_on(async { + swarm1.listen_on(listen_address.clone()).unwrap(); + + loop { + if let SwarmEvent::NewListenAddr { .. } = swarm1.select_next_some().await { + break + } + } + }); + executor::block_on(async { + swarm2.behaviour.inner().next_action = Some(NetworkBehaviourAction::ReportPeerAddr { + peer_id: swarm1_peer_id, + address: listen_address + }); + loop { + if let SwarmEvent::AddressBookUpdated { .. } = swarm2.select_next_some().await { + break + } + } + }); + + swarm2.dial(&swarm1_peer_id).unwrap(); + + executor::block_on(async { + let mut swarm1_connected = false; + let mut swarm2_connected = false; + + while !swarm1_connected || !swarm2_connected { + futures::select! { + event = swarm1.select_next_some() => { + match event { + SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == swarm2_peer_id => { + swarm1_connected = true; + } + _ => {} + } + } + event = swarm2.select_next_some() => { + match event { + SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == swarm1_peer_id => { + swarm2_connected = true; + } + _ => {} + } + } + } + } + }) + } }