Skip to content

Commit

Permalink
fix(identify): handle partial push messages
Browse files Browse the repository at this point in the history
According to the [spec], all fields within push messages are optional. To handle missing fields, we deserialize push messages into a different struct, patch the previously received, full identify message with it and emit this as the new info.

Previously, we failed to parse the message which causes incompatibility with js-libp2p nodes as they don't send the public key in push messages. See https://github.com/libp2p/js-libp2p/blob/88c47f51f9d67a6261e4ac65c494cd1e6e4ed8dd/packages/libp2p/src/identify/identify.ts#L205-L210.

[spec]: https://github.com/libp2p/specs/tree/master/identify#identifypush

Pull-Request: #4495.
  • Loading branch information
Marcel-G authored Sep 24, 2023
1 parent 95890b5 commit ad45d23
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 58 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ libp2p-deflate = { version = "0.40.0", path = "transports/deflate" }
libp2p-dns = { version = "0.40.1", path = "transports/dns" }
libp2p-floodsub = { version = "0.43.0", path = "protocols/floodsub" }
libp2p-gossipsub = { version = "0.45.1", path = "protocols/gossipsub" }
libp2p-identify = { version = "0.43.0", path = "protocols/identify" }
libp2p-identify = { version = "0.43.1", path = "protocols/identify" }
libp2p-identity = { version = "0.2.3" }
libp2p-kad = { version = "0.44.5", path = "protocols/kad" }
libp2p-mdns = { version = "0.44.0", path = "protocols/mdns" }
Expand Down
10 changes: 9 additions & 1 deletion protocols/identify/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
## 0.43.0
## 0.43.1 - unreleased

- Handle partial push messages.
Previously, push messages with partial information were ignored.
See [PR 4495].

[PR 4495]: https://github.com/libp2p/rust-libp2p/pull/4495

## 0.43.0

- Observed addresses (aka. external address candidates) of the local node, reported by a remote node via `libp2p-identify`, are no longer automatically considered confirmed external addresses, in other words they are no longer trusted by default.
Instead users need to confirm the reported observed address either manually, or by using `libp2p-autonat`.
Expand Down
2 changes: 1 addition & 1 deletion protocols/identify/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-identify"
edition = "2021"
rust-version = { workspace = true }
description = "Nodes identifcation protocol for libp2p"
version = "0.43.0"
version = "0.43.1"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
32 changes: 24 additions & 8 deletions protocols/identify/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::protocol::{Identify, InboundPush, Info, OutboundPush, Push, UpgradeError};
use crate::protocol::{Identify, InboundPush, OutboundPush, Push, UpgradeError};
use crate::protocol::{Info, PushInfo};
use either::Either;
use futures::future::BoxFuture;
use futures::prelude::*;
Expand Down Expand Up @@ -48,7 +49,7 @@ use std::{io, task::Context, task::Poll, time::Duration};
/// permitting the underlying connection to be closed.
pub struct Handler {
remote_peer_id: PeerId,
inbound_identify_push: Option<BoxFuture<'static, Result<Info, UpgradeError>>>,
inbound_identify_push: Option<BoxFuture<'static, Result<PushInfo, UpgradeError>>>,
/// Pending events to yield.
events: SmallVec<
[ConnectionHandlerEvent<Either<Identify, Push<OutboundPush>>, (), Event, io::Error>; 4],
Expand Down Expand Up @@ -80,6 +81,9 @@ pub struct Handler {
/// Address observed by or for the remote.
observed_addr: Multiaddr,

/// Identify information about the remote peer.
remote_info: Option<Info>,

local_supported_protocols: SupportedProtocols,
remote_supported_protocols: HashSet<StreamProtocol>,
external_addresses: HashSet<Multiaddr>,
Expand Down Expand Up @@ -133,6 +137,7 @@ impl Handler {
observed_addr,
local_supported_protocols: SupportedProtocols::default(),
remote_supported_protocols: HashSet::default(),
remote_info: Default::default(),
external_addresses,
}
}
Expand Down Expand Up @@ -176,7 +181,8 @@ impl Handler {
) {
match output {
future::Either::Left(remote_info) => {
self.update_supported_protocols_for_remote(&remote_info);
self.handle_incoming_info(&remote_info);

self.events
.push(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified(
remote_info,
Expand Down Expand Up @@ -213,6 +219,12 @@ impl Handler {
}
}

fn handle_incoming_info(&mut self, info: &Info) {
self.remote_info.replace(info.clone());

self.update_supported_protocols_for_remote(info);
}

fn update_supported_protocols_for_remote(&mut self, remote_info: &Info) {
let new_remote_protocols = HashSet::from_iter(remote_info.protocols.clone());

Expand Down Expand Up @@ -318,11 +330,15 @@ impl ConnectionHandler for Handler {
{
self.inbound_identify_push.take();

if let Ok(info) = res {
self.update_supported_protocols_for_remote(&info);
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified(
info,
)));
if let Ok(remote_push_info) = res {
if let Some(mut info) = self.remote_info.clone() {
info.merge(remote_push_info);
self.handle_incoming_info(&info);

return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::Identified(info),
));
};
}
}

Expand Down
181 changes: 135 additions & 46 deletions protocols/identify/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl Push<OutboundPush> {
}
}

/// Information of a peer sent in protocol messages.
/// Identify information of a peer sent in protocol messages.
#[derive(Debug, Clone)]
pub struct Info {
/// The public key of the local peer.
Expand All @@ -82,6 +82,41 @@ pub struct Info {
pub observed_addr: Multiaddr,
}

impl Info {
pub fn merge(&mut self, info: PushInfo) {
if let Some(public_key) = info.public_key {
self.public_key = public_key;
}
if let Some(protocol_version) = info.protocol_version {
self.protocol_version = protocol_version;
}
if let Some(agent_version) = info.agent_version {
self.agent_version = agent_version;
}
if !info.listen_addrs.is_empty() {
self.listen_addrs = info.listen_addrs;
}
if !info.protocols.is_empty() {
self.protocols = info.protocols;
}
if let Some(observed_addr) = info.observed_addr {
self.observed_addr = observed_addr;
}
}
}

/// Identify push information of a peer sent in protocol messages.
/// Note that missing fields should be ignored, as peers may choose to send partial updates containing only the fields whose values have changed.
#[derive(Debug, Clone)]
pub struct PushInfo {
pub public_key: Option<PublicKey>,
pub protocol_version: Option<String>,
pub agent_version: Option<String>,
pub listen_addrs: Vec<Multiaddr>,
pub protocols: Vec<StreamProtocol>,
pub observed_addr: Option<Multiaddr>,
}

impl UpgradeInfo for Identify {
type Info = StreamProtocol;
type InfoIter = iter::Once<Self::Info>;
Expand Down Expand Up @@ -110,7 +145,7 @@ where
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;

fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future {
recv(socket).boxed()
recv_identify(socket).boxed()
}
}

Expand All @@ -127,13 +162,13 @@ impl<C> InboundUpgrade<C> for Push<InboundPush>
where
C: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type Output = BoxFuture<'static, Result<Info, UpgradeError>>;
type Output = BoxFuture<'static, Result<PushInfo, UpgradeError>>;
type Error = Void;
type Future = future::Ready<Result<Self::Output, Self::Error>>;

fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future {
// Lazily upgrade stream, thus allowing upgrade to happen within identify's handler.
future::ok(recv(socket).boxed())
future::ok(recv_push(socket).boxed())
}
}

Expand Down Expand Up @@ -184,7 +219,29 @@ where
Ok(())
}

async fn recv<T>(socket: T) -> Result<Info, UpgradeError>
async fn recv_push<T>(socket: T) -> Result<PushInfo, UpgradeError>
where
T: AsyncRead + AsyncWrite + Unpin,
{
let info = recv(socket).await?.try_into()?;

trace!("Received {:?}", info);

Ok(info)
}

async fn recv_identify<T>(socket: T) -> Result<Info, UpgradeError>
where
T: AsyncRead + AsyncWrite + Unpin,
{
let info = recv(socket).await?.try_into()?;

trace!("Received {:?}", info);

Ok(info)
}

async fn recv<T>(socket: T) -> Result<proto::Identify, UpgradeError>
where
T: AsyncRead + AsyncWrite + Unpin,
{
Expand All @@ -199,61 +256,93 @@ where
)
.next()
.await
.ok_or(UpgradeError::StreamClosed)??
.try_into()?;

trace!("Received: {:?}", info);
.ok_or(UpgradeError::StreamClosed)??;

Ok(info)
}

impl TryFrom<proto::Identify> for Info {
type Error = UpgradeError;
fn parse_listen_addrs(listen_addrs: Vec<Vec<u8>>) -> Vec<Multiaddr> {
listen_addrs
.into_iter()
.filter_map(|bytes| match Multiaddr::try_from(bytes) {
Ok(a) => Some(a),
Err(e) => {
debug!("Unable to parse multiaddr: {e:?}");
None
}
})
.collect()
}

fn try_from(msg: proto::Identify) -> Result<Self, Self::Error> {
fn parse_multiaddr(bytes: Vec<u8>) -> Result<Multiaddr, multiaddr::Error> {
Multiaddr::try_from(bytes)
fn parse_protocols(protocols: Vec<String>) -> Vec<StreamProtocol> {
protocols
.into_iter()
.filter_map(|p| match StreamProtocol::try_from_owned(p) {
Ok(p) => Some(p),
Err(e) => {
debug!("Received invalid protocol from peer: {e}");
None
}
})
.collect()
}

fn parse_public_key(public_key: Option<Vec<u8>>) -> Option<PublicKey> {
public_key.and_then(|key| match PublicKey::try_decode_protobuf(&key) {
Ok(k) => Some(k),
Err(e) => {
debug!("Unable to decode public key: {e:?}");
None
}
})
}

let listen_addrs = {
let mut addrs = Vec::new();
for addr in msg.listenAddrs.into_iter() {
match parse_multiaddr(addr) {
Ok(a) => addrs.push(a),
Err(e) => {
debug!("Unable to parse multiaddr: {e:?}");
}
}
}
addrs
};
fn parse_observed_addr(observed_addr: Option<Vec<u8>>) -> Option<Multiaddr> {
observed_addr.and_then(|bytes| match Multiaddr::try_from(bytes) {
Ok(a) => Some(a),
Err(e) => {
debug!("Unable to parse observed multiaddr: {e:?}");
None
}
})
}

let public_key = PublicKey::try_decode_protobuf(&msg.publicKey.unwrap_or_default())?;
impl TryFrom<proto::Identify> for Info {
type Error = UpgradeError;

let observed_addr = match parse_multiaddr(msg.observedAddr.unwrap_or_default()) {
Ok(a) => a,
Err(e) => {
debug!("Unable to parse multiaddr: {e:?}");
Multiaddr::empty()
fn try_from(msg: proto::Identify) -> Result<Self, Self::Error> {
let public_key = {
match parse_public_key(msg.publicKey) {
Some(key) => key,
// This will always produce a DecodingError if the public key is missing.
None => PublicKey::try_decode_protobuf(Default::default())?,
}
};

let info = Info {
public_key,
protocol_version: msg.protocolVersion.unwrap_or_default(),
agent_version: msg.agentVersion.unwrap_or_default(),
listen_addrs,
protocols: msg
.protocols
.into_iter()
.filter_map(|p| match StreamProtocol::try_from_owned(p) {
Ok(p) => Some(p),
Err(e) => {
debug!("Received invalid protocol from peer: {e}");
None
}
})
.collect(),
observed_addr,
listen_addrs: parse_listen_addrs(msg.listenAddrs),
protocols: parse_protocols(msg.protocols),
observed_addr: parse_observed_addr(msg.observedAddr).unwrap_or(Multiaddr::empty()),
};

Ok(info)
}
}

impl TryFrom<proto::Identify> for PushInfo {
type Error = UpgradeError;

fn try_from(msg: proto::Identify) -> Result<Self, Self::Error> {
let info = PushInfo {
public_key: parse_public_key(msg.publicKey),
protocol_version: msg.protocolVersion,
agent_version: msg.agentVersion,
listen_addrs: parse_listen_addrs(msg.listenAddrs),
protocols: parse_protocols(msg.protocols),
observed_addr: parse_observed_addr(msg.observedAddr),
};

Ok(info)
Expand Down Expand Up @@ -303,7 +392,7 @@ mod tests {
),
};

let info = Info::try_from(payload).expect("not to fail");
let info = PushInfo::try_from(payload).expect("not to fail");

assert_eq!(info.listen_addrs, vec![valid_multiaddr])
}
Expand Down

0 comments on commit ad45d23

Please sign in to comment.