Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(identify): handle partial push messages #4495

Merged
merged 13 commits into from
Sep 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ libp2p-deflate = { version = "0.40.0", path = "transports/deflate" }
libp2p-dns = { version = "0.40.1", path = "transports/dns" }
libp2p-floodsub = { version = "0.43.0", path = "protocols/floodsub" }
libp2p-gossipsub = { version = "0.45.1", path = "protocols/gossipsub" }
libp2p-identify = { version = "0.43.0", path = "protocols/identify" }
libp2p-identify = { version = "0.43.1", path = "protocols/identify" }
libp2p-identity = { version = "0.2.3" }
libp2p-kad = { version = "0.44.5", path = "protocols/kad" }
libp2p-mdns = { version = "0.44.0", path = "protocols/mdns" }
Expand Down
10 changes: 9 additions & 1 deletion protocols/identify/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
## 0.43.0
## 0.43.1 - unreleased

- Handle partial push messages.
Previously, push messages with partial information were ignored.
See [PR 4495].

[PR 4495]: https://github.com/libp2p/rust-libp2p/pull/4495

## 0.43.0

- Observed addresses (aka. external address candidates) of the local node, reported by a remote node via `libp2p-identify`, are no longer automatically considered confirmed external addresses, in other words they are no longer trusted by default.
Instead users need to confirm the reported observed address either manually, or by using `libp2p-autonat`.
Expand Down
2 changes: 1 addition & 1 deletion protocols/identify/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-identify"
edition = "2021"
rust-version = { workspace = true }
description = "Nodes identifcation protocol for libp2p"
version = "0.43.0"
version = "0.43.1"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
32 changes: 24 additions & 8 deletions protocols/identify/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::protocol::{Identify, InboundPush, Info, OutboundPush, Push, UpgradeError};
use crate::protocol::{Identify, InboundPush, OutboundPush, Push, UpgradeError};
use crate::protocol::{Info, PushInfo};
use either::Either;
use futures::future::BoxFuture;
use futures::prelude::*;
Expand Down Expand Up @@ -48,7 +49,7 @@ use std::{io, task::Context, task::Poll, time::Duration};
/// permitting the underlying connection to be closed.
pub struct Handler {
remote_peer_id: PeerId,
inbound_identify_push: Option<BoxFuture<'static, Result<Info, UpgradeError>>>,
inbound_identify_push: Option<BoxFuture<'static, Result<PushInfo, UpgradeError>>>,
/// Pending events to yield.
events: SmallVec<
[ConnectionHandlerEvent<Either<Identify, Push<OutboundPush>>, (), Event, io::Error>; 4],
Expand Down Expand Up @@ -80,6 +81,9 @@ pub struct Handler {
/// Address observed by or for the remote.
observed_addr: Multiaddr,

/// Identify information about the remote peer.
remote_info: Option<Info>,

local_supported_protocols: SupportedProtocols,
remote_supported_protocols: HashSet<StreamProtocol>,
external_addresses: HashSet<Multiaddr>,
Expand Down Expand Up @@ -133,6 +137,7 @@ impl Handler {
observed_addr,
local_supported_protocols: SupportedProtocols::default(),
remote_supported_protocols: HashSet::default(),
remote_info: Default::default(),
external_addresses,
}
}
Expand Down Expand Up @@ -176,7 +181,8 @@ impl Handler {
) {
match output {
future::Either::Left(remote_info) => {
self.update_supported_protocols_for_remote(&remote_info);
self.handle_incoming_info(&remote_info);

self.events
.push(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified(
remote_info,
Expand Down Expand Up @@ -213,6 +219,12 @@ impl Handler {
}
}

fn handle_incoming_info(&mut self, info: &Info) {
self.remote_info.replace(info.clone());

self.update_supported_protocols_for_remote(info);
}

fn update_supported_protocols_for_remote(&mut self, remote_info: &Info) {
let new_remote_protocols = HashSet::from_iter(remote_info.protocols.clone());

Expand Down Expand Up @@ -318,11 +330,15 @@ impl ConnectionHandler for Handler {
{
self.inbound_identify_push.take();

if let Ok(info) = res {
self.update_supported_protocols_for_remote(&info);
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified(
info,
)));
if let Ok(remote_push_info) = res {
if let Some(mut info) = self.remote_info.clone() {
info.merge(remote_push_info);
self.handle_incoming_info(&info);

return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::Identified(info),
));
};
}
}

Expand Down
181 changes: 135 additions & 46 deletions protocols/identify/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl Push<OutboundPush> {
}
}

/// Information of a peer sent in protocol messages.
/// Identify information of a peer sent in protocol messages.
#[derive(Debug, Clone)]
pub struct Info {
/// The public key of the local peer.
Expand All @@ -82,6 +82,41 @@ pub struct Info {
pub observed_addr: Multiaddr,
}

impl Info {
pub fn merge(&mut self, info: PushInfo) {
if let Some(public_key) = info.public_key {
self.public_key = public_key;
}
if let Some(protocol_version) = info.protocol_version {
self.protocol_version = protocol_version;
}
if let Some(agent_version) = info.agent_version {
self.agent_version = agent_version;
}
if !info.listen_addrs.is_empty() {
self.listen_addrs = info.listen_addrs;
}
if !info.protocols.is_empty() {
self.protocols = info.protocols;
}
if let Some(observed_addr) = info.observed_addr {
self.observed_addr = observed_addr;
}
}
}

/// Identify push information of a peer sent in protocol messages.
/// Note that missing fields should be ignored, as peers may choose to send partial updates containing only the fields whose values have changed.
#[derive(Debug, Clone)]
pub struct PushInfo {
pub public_key: Option<PublicKey>,
pub protocol_version: Option<String>,
pub agent_version: Option<String>,
pub listen_addrs: Vec<Multiaddr>,
pub protocols: Vec<StreamProtocol>,
pub observed_addr: Option<Multiaddr>,
}

impl UpgradeInfo for Identify {
type Info = StreamProtocol;
type InfoIter = iter::Once<Self::Info>;
Expand Down Expand Up @@ -110,7 +145,7 @@ where
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;

fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future {
recv(socket).boxed()
recv_identify(socket).boxed()
}
}

Expand All @@ -127,13 +162,13 @@ impl<C> InboundUpgrade<C> for Push<InboundPush>
where
C: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type Output = BoxFuture<'static, Result<Info, UpgradeError>>;
type Output = BoxFuture<'static, Result<PushInfo, UpgradeError>>;
type Error = Void;
type Future = future::Ready<Result<Self::Output, Self::Error>>;

fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future {
// Lazily upgrade stream, thus allowing upgrade to happen within identify's handler.
future::ok(recv(socket).boxed())
future::ok(recv_push(socket).boxed())
}
}

Expand Down Expand Up @@ -184,7 +219,29 @@ where
Ok(())
}

async fn recv<T>(socket: T) -> Result<Info, UpgradeError>
async fn recv_push<T>(socket: T) -> Result<PushInfo, UpgradeError>
where
T: AsyncRead + AsyncWrite + Unpin,
{
let info = recv(socket).await?.try_into()?;

trace!("Received {:?}", info);

Ok(info)
}

async fn recv_identify<T>(socket: T) -> Result<Info, UpgradeError>
where
T: AsyncRead + AsyncWrite + Unpin,
{
let info = recv(socket).await?.try_into()?;

trace!("Received {:?}", info);

Ok(info)
}

async fn recv<T>(socket: T) -> Result<proto::Identify, UpgradeError>
where
T: AsyncRead + AsyncWrite + Unpin,
{
Expand All @@ -199,61 +256,93 @@ where
)
.next()
.await
.ok_or(UpgradeError::StreamClosed)??
.try_into()?;

trace!("Received: {:?}", info);
.ok_or(UpgradeError::StreamClosed)??;

Ok(info)
}

impl TryFrom<proto::Identify> for Info {
type Error = UpgradeError;
fn parse_listen_addrs(listen_addrs: Vec<Vec<u8>>) -> Vec<Multiaddr> {
listen_addrs
.into_iter()
.filter_map(|bytes| match Multiaddr::try_from(bytes) {
Ok(a) => Some(a),
Err(e) => {
debug!("Unable to parse multiaddr: {e:?}");
None
}
})
.collect()
}

fn try_from(msg: proto::Identify) -> Result<Self, Self::Error> {
fn parse_multiaddr(bytes: Vec<u8>) -> Result<Multiaddr, multiaddr::Error> {
Multiaddr::try_from(bytes)
fn parse_protocols(protocols: Vec<String>) -> Vec<StreamProtocol> {
protocols
.into_iter()
.filter_map(|p| match StreamProtocol::try_from_owned(p) {
Ok(p) => Some(p),
Err(e) => {
debug!("Received invalid protocol from peer: {e}");
None
}
})
.collect()
}

fn parse_public_key(public_key: Option<Vec<u8>>) -> Option<PublicKey> {
public_key.and_then(|key| match PublicKey::try_decode_protobuf(&key) {
Ok(k) => Some(k),
Err(e) => {
debug!("Unable to decode public key: {e:?}");
None
}
})
}

let listen_addrs = {
let mut addrs = Vec::new();
for addr in msg.listenAddrs.into_iter() {
match parse_multiaddr(addr) {
Ok(a) => addrs.push(a),
Err(e) => {
debug!("Unable to parse multiaddr: {e:?}");
}
}
}
addrs
};
fn parse_observed_addr(observed_addr: Option<Vec<u8>>) -> Option<Multiaddr> {
observed_addr.and_then(|bytes| match Multiaddr::try_from(bytes) {
Ok(a) => Some(a),
Err(e) => {
debug!("Unable to parse observed multiaddr: {e:?}");
None
}
})
}

let public_key = PublicKey::try_decode_protobuf(&msg.publicKey.unwrap_or_default())?;
impl TryFrom<proto::Identify> for Info {
type Error = UpgradeError;

let observed_addr = match parse_multiaddr(msg.observedAddr.unwrap_or_default()) {
Ok(a) => a,
Err(e) => {
debug!("Unable to parse multiaddr: {e:?}");
Multiaddr::empty()
fn try_from(msg: proto::Identify) -> Result<Self, Self::Error> {
let public_key = {
match parse_public_key(msg.publicKey) {
Some(key) => key,
// This will always produce a DecodingError if the public key is missing.
None => PublicKey::try_decode_protobuf(Default::default())?,
}
};

let info = Info {
public_key,
protocol_version: msg.protocolVersion.unwrap_or_default(),
agent_version: msg.agentVersion.unwrap_or_default(),
listen_addrs,
protocols: msg
.protocols
.into_iter()
.filter_map(|p| match StreamProtocol::try_from_owned(p) {
Ok(p) => Some(p),
Err(e) => {
debug!("Received invalid protocol from peer: {e}");
None
}
})
.collect(),
observed_addr,
listen_addrs: parse_listen_addrs(msg.listenAddrs),
protocols: parse_protocols(msg.protocols),
observed_addr: parse_observed_addr(msg.observedAddr).unwrap_or(Multiaddr::empty()),
};

Ok(info)
}
}

impl TryFrom<proto::Identify> for PushInfo {
type Error = UpgradeError;

fn try_from(msg: proto::Identify) -> Result<Self, Self::Error> {
let info = PushInfo {
public_key: parse_public_key(msg.publicKey),
protocol_version: msg.protocolVersion,
agent_version: msg.agentVersion,
listen_addrs: parse_listen_addrs(msg.listenAddrs),
protocols: parse_protocols(msg.protocols),
observed_addr: parse_observed_addr(msg.observedAddr),
};

Ok(info)
Expand Down Expand Up @@ -303,7 +392,7 @@ mod tests {
),
};

let info = Info::try_from(payload).expect("not to fail");
let info = PushInfo::try_from(payload).expect("not to fail");

assert_eq!(info.listen_addrs, vec![valid_multiaddr])
}
Expand Down