diff --git a/Cargo.lock b/Cargo.lock index 592ea05fc7e..fb77906f3ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2610,7 +2610,7 @@ dependencies = [ [[package]] name = "libp2p-identify" -version = "0.43.0" +version = "0.43.1" dependencies = [ "async-std", "asynchronous-codec", diff --git a/Cargo.toml b/Cargo.toml index 9685bd42d5f..3c3adb74575 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,7 +81,7 @@ libp2p-deflate = { version = "0.40.0", path = "transports/deflate" } libp2p-dns = { version = "0.40.1", path = "transports/dns" } libp2p-floodsub = { version = "0.43.0", path = "protocols/floodsub" } libp2p-gossipsub = { version = "0.45.1", path = "protocols/gossipsub" } -libp2p-identify = { version = "0.43.0", path = "protocols/identify" } +libp2p-identify = { version = "0.43.1", path = "protocols/identify" } libp2p-identity = { version = "0.2.3" } libp2p-kad = { version = "0.44.5", path = "protocols/kad" } libp2p-mdns = { version = "0.44.0", path = "protocols/mdns" } diff --git a/protocols/identify/CHANGELOG.md b/protocols/identify/CHANGELOG.md index 79632954718..0e8812b1b42 100644 --- a/protocols/identify/CHANGELOG.md +++ b/protocols/identify/CHANGELOG.md @@ -1,4 +1,12 @@ -## 0.43.0 +## 0.43.1 - unreleased + +- Handle partial push messages. + Previously, push messages with partial information were ignored. + See [PR 4495]. + +[PR 4495]: https://github.com/libp2p/rust-libp2p/pull/4495 + +## 0.43.0 - Observed addresses (aka. external address candidates) of the local node, reported by a remote node via `libp2p-identify`, are no longer automatically considered confirmed external addresses, in other words they are no longer trusted by default. Instead users need to confirm the reported observed address either manually, or by using `libp2p-autonat`. diff --git a/protocols/identify/Cargo.toml b/protocols/identify/Cargo.toml index 6e892538669..0f40f8cb01c 100644 --- a/protocols/identify/Cargo.toml +++ b/protocols/identify/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-identify" edition = "2021" rust-version = { workspace = true } description = "Nodes identifcation protocol for libp2p" -version = "0.43.0" +version = "0.43.1" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index 5a1712e8c3d..162a1e8fb06 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -18,7 +18,8 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::protocol::{Identify, InboundPush, Info, OutboundPush, Push, UpgradeError}; +use crate::protocol::{Identify, InboundPush, OutboundPush, Push, UpgradeError}; +use crate::protocol::{Info, PushInfo}; use either::Either; use futures::future::BoxFuture; use futures::prelude::*; @@ -48,7 +49,7 @@ use std::{io, task::Context, task::Poll, time::Duration}; /// permitting the underlying connection to be closed. pub struct Handler { remote_peer_id: PeerId, - inbound_identify_push: Option>>, + inbound_identify_push: Option>>, /// Pending events to yield. events: SmallVec< [ConnectionHandlerEvent>, (), Event, io::Error>; 4], @@ -80,6 +81,9 @@ pub struct Handler { /// Address observed by or for the remote. observed_addr: Multiaddr, + /// Identify information about the remote peer. + remote_info: Option, + local_supported_protocols: SupportedProtocols, remote_supported_protocols: HashSet, external_addresses: HashSet, @@ -133,6 +137,7 @@ impl Handler { observed_addr, local_supported_protocols: SupportedProtocols::default(), remote_supported_protocols: HashSet::default(), + remote_info: Default::default(), external_addresses, } } @@ -176,7 +181,8 @@ impl Handler { ) { match output { future::Either::Left(remote_info) => { - self.update_supported_protocols_for_remote(&remote_info); + self.handle_incoming_info(&remote_info); + self.events .push(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified( remote_info, @@ -213,6 +219,12 @@ impl Handler { } } + fn handle_incoming_info(&mut self, info: &Info) { + self.remote_info.replace(info.clone()); + + self.update_supported_protocols_for_remote(info); + } + fn update_supported_protocols_for_remote(&mut self, remote_info: &Info) { let new_remote_protocols = HashSet::from_iter(remote_info.protocols.clone()); @@ -318,11 +330,15 @@ impl ConnectionHandler for Handler { { self.inbound_identify_push.take(); - if let Ok(info) = res { - self.update_supported_protocols_for_remote(&info); - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified( - info, - ))); + if let Ok(remote_push_info) = res { + if let Some(mut info) = self.remote_info.clone() { + info.merge(remote_push_info); + self.handle_incoming_info(&info); + + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::Identified(info), + )); + }; } } diff --git a/protocols/identify/src/protocol.rs b/protocols/identify/src/protocol.rs index a508591b106..989c94a4d67 100644 --- a/protocols/identify/src/protocol.rs +++ b/protocols/identify/src/protocol.rs @@ -63,7 +63,7 @@ impl Push { } } -/// Information of a peer sent in protocol messages. +/// Identify information of a peer sent in protocol messages. #[derive(Debug, Clone)] pub struct Info { /// The public key of the local peer. @@ -82,6 +82,41 @@ pub struct Info { pub observed_addr: Multiaddr, } +impl Info { + pub fn merge(&mut self, info: PushInfo) { + if let Some(public_key) = info.public_key { + self.public_key = public_key; + } + if let Some(protocol_version) = info.protocol_version { + self.protocol_version = protocol_version; + } + if let Some(agent_version) = info.agent_version { + self.agent_version = agent_version; + } + if !info.listen_addrs.is_empty() { + self.listen_addrs = info.listen_addrs; + } + if !info.protocols.is_empty() { + self.protocols = info.protocols; + } + if let Some(observed_addr) = info.observed_addr { + self.observed_addr = observed_addr; + } + } +} + +/// Identify push information of a peer sent in protocol messages. +/// Note that missing fields should be ignored, as peers may choose to send partial updates containing only the fields whose values have changed. +#[derive(Debug, Clone)] +pub struct PushInfo { + pub public_key: Option, + pub protocol_version: Option, + pub agent_version: Option, + pub listen_addrs: Vec, + pub protocols: Vec, + pub observed_addr: Option, +} + impl UpgradeInfo for Identify { type Info = StreamProtocol; type InfoIter = iter::Once; @@ -110,7 +145,7 @@ where type Future = Pin> + Send>>; fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { - recv(socket).boxed() + recv_identify(socket).boxed() } } @@ -127,13 +162,13 @@ impl InboundUpgrade for Push where C: AsyncRead + AsyncWrite + Unpin + Send + 'static, { - type Output = BoxFuture<'static, Result>; + type Output = BoxFuture<'static, Result>; type Error = Void; type Future = future::Ready>; fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future { // Lazily upgrade stream, thus allowing upgrade to happen within identify's handler. - future::ok(recv(socket).boxed()) + future::ok(recv_push(socket).boxed()) } } @@ -184,7 +219,29 @@ where Ok(()) } -async fn recv(socket: T) -> Result +async fn recv_push(socket: T) -> Result +where + T: AsyncRead + AsyncWrite + Unpin, +{ + let info = recv(socket).await?.try_into()?; + + trace!("Received {:?}", info); + + Ok(info) +} + +async fn recv_identify(socket: T) -> Result +where + T: AsyncRead + AsyncWrite + Unpin, +{ + let info = recv(socket).await?.try_into()?; + + trace!("Received {:?}", info); + + Ok(info) +} + +async fn recv(socket: T) -> Result where T: AsyncRead + AsyncWrite + Unpin, { @@ -199,61 +256,93 @@ where ) .next() .await - .ok_or(UpgradeError::StreamClosed)?? - .try_into()?; - - trace!("Received: {:?}", info); + .ok_or(UpgradeError::StreamClosed)??; Ok(info) } -impl TryFrom for Info { - type Error = UpgradeError; +fn parse_listen_addrs(listen_addrs: Vec>) -> Vec { + listen_addrs + .into_iter() + .filter_map(|bytes| match Multiaddr::try_from(bytes) { + Ok(a) => Some(a), + Err(e) => { + debug!("Unable to parse multiaddr: {e:?}"); + None + } + }) + .collect() +} - fn try_from(msg: proto::Identify) -> Result { - fn parse_multiaddr(bytes: Vec) -> Result { - Multiaddr::try_from(bytes) +fn parse_protocols(protocols: Vec) -> Vec { + protocols + .into_iter() + .filter_map(|p| match StreamProtocol::try_from_owned(p) { + Ok(p) => Some(p), + Err(e) => { + debug!("Received invalid protocol from peer: {e}"); + None + } + }) + .collect() +} + +fn parse_public_key(public_key: Option>) -> Option { + public_key.and_then(|key| match PublicKey::try_decode_protobuf(&key) { + Ok(k) => Some(k), + Err(e) => { + debug!("Unable to decode public key: {e:?}"); + None } + }) +} - let listen_addrs = { - let mut addrs = Vec::new(); - for addr in msg.listenAddrs.into_iter() { - match parse_multiaddr(addr) { - Ok(a) => addrs.push(a), - Err(e) => { - debug!("Unable to parse multiaddr: {e:?}"); - } - } - } - addrs - }; +fn parse_observed_addr(observed_addr: Option>) -> Option { + observed_addr.and_then(|bytes| match Multiaddr::try_from(bytes) { + Ok(a) => Some(a), + Err(e) => { + debug!("Unable to parse observed multiaddr: {e:?}"); + None + } + }) +} - let public_key = PublicKey::try_decode_protobuf(&msg.publicKey.unwrap_or_default())?; +impl TryFrom for Info { + type Error = UpgradeError; - let observed_addr = match parse_multiaddr(msg.observedAddr.unwrap_or_default()) { - Ok(a) => a, - Err(e) => { - debug!("Unable to parse multiaddr: {e:?}"); - Multiaddr::empty() + fn try_from(msg: proto::Identify) -> Result { + let public_key = { + match parse_public_key(msg.publicKey) { + Some(key) => key, + // This will always produce a DecodingError if the public key is missing. + None => PublicKey::try_decode_protobuf(Default::default())?, } }; + let info = Info { public_key, protocol_version: msg.protocolVersion.unwrap_or_default(), agent_version: msg.agentVersion.unwrap_or_default(), - listen_addrs, - protocols: msg - .protocols - .into_iter() - .filter_map(|p| match StreamProtocol::try_from_owned(p) { - Ok(p) => Some(p), - Err(e) => { - debug!("Received invalid protocol from peer: {e}"); - None - } - }) - .collect(), - observed_addr, + listen_addrs: parse_listen_addrs(msg.listenAddrs), + protocols: parse_protocols(msg.protocols), + observed_addr: parse_observed_addr(msg.observedAddr).unwrap_or(Multiaddr::empty()), + }; + + Ok(info) + } +} + +impl TryFrom for PushInfo { + type Error = UpgradeError; + + fn try_from(msg: proto::Identify) -> Result { + let info = PushInfo { + public_key: parse_public_key(msg.publicKey), + protocol_version: msg.protocolVersion, + agent_version: msg.agentVersion, + listen_addrs: parse_listen_addrs(msg.listenAddrs), + protocols: parse_protocols(msg.protocols), + observed_addr: parse_observed_addr(msg.observedAddr), }; Ok(info) @@ -303,7 +392,7 @@ mod tests { ), }; - let info = Info::try_from(payload).expect("not to fail"); + let info = PushInfo::try_from(payload).expect("not to fail"); assert_eq!(info.listen_addrs, vec![valid_multiaddr]) }