Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(identify): handle partial push messages #4495

Merged
merged 13 commits into from
Sep 24, 2023
Merged
58 changes: 43 additions & 15 deletions protocols/identify/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::protocol::{Identify, InboundPush, Info, OutboundPush, Push, UpgradeError};
use crate::protocol::{Identify, InboundPush, OutboundPush, Push, UpgradeError};
use crate::protocol::{Info, PushInfo};
use either::Either;
use futures::future::BoxFuture;
use futures::prelude::*;
Expand Down Expand Up @@ -48,7 +49,7 @@ use std::{io, task::Context, task::Poll, time::Duration};
/// permitting the underlying connection to be closed.
pub struct Handler {
remote_peer_id: PeerId,
inbound_identify_push: Option<BoxFuture<'static, Result<Info, UpgradeError>>>,
inbound_identify_push: Option<BoxFuture<'static, Result<PushInfo, UpgradeError>>>,
/// Pending events to yield.
events: SmallVec<
[ConnectionHandlerEvent<Either<Identify, Push<OutboundPush>>, (), Event, io::Error>; 4],
Expand Down Expand Up @@ -80,6 +81,9 @@ pub struct Handler {
/// Address observed by or for the remote.
observed_addr: Multiaddr,

/// Identify information about the remote peer.
remote_info: Option<Info>,

local_supported_protocols: SupportedProtocols,
remote_supported_protocols: HashSet<StreamProtocol>,
external_addresses: HashSet<Multiaddr>,
Expand Down Expand Up @@ -133,6 +137,7 @@ impl Handler {
observed_addr,
local_supported_protocols: SupportedProtocols::default(),
remote_supported_protocols: HashSet::default(),
remote_info: Default::default(),
external_addresses,
}
}
Expand Down Expand Up @@ -175,12 +180,8 @@ impl Handler {
>,
) {
match output {
future::Either::Left(remote_info) => {
self.update_supported_protocols_for_remote(&remote_info);
self.events
.push(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified(
remote_info,
)));
future::Either::Left(info) => {
self.handle_incoming_info(info);
}
future::Either::Right(()) => self.events.push(ConnectionHandlerEvent::NotifyBehaviour(
Event::IdentificationPushed,
Expand Down Expand Up @@ -213,8 +214,38 @@ impl Handler {
}
}

fn update_supported_protocols_for_remote(&mut self, remote_info: &Info) {
let new_remote_protocols = HashSet::from_iter(remote_info.protocols.clone());
fn handle_incoming_info(&mut self, info: Info) {
self.remote_info.replace(info.clone());

self.update_supported_protocols_for_remote(&info);

self.events
.push(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified(
info,
)));
}

fn handle_incoming_push_info(&mut self, push_info: PushInfo) {
if let Some(mut info) = self.remote_info.take() {
info.merge(push_info);
self.remote_info.replace(info.clone());

self.update_supported_protocols_for_remote(&info);

self.events
.push(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified(
info,
)));
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
} else {
warn!(
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
"Failed to process push from {:?} because no identify info was received before",
self.remote_peer_id
)
}
}

fn update_supported_protocols_for_remote(&mut self, info: &Info) {
let new_remote_protocols = HashSet::from_iter(info.protocols.clone());
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved

let remote_added_protocols = new_remote_protocols
.difference(&self.remote_supported_protocols)
Expand Down Expand Up @@ -318,11 +349,8 @@ impl ConnectionHandler for Handler {
{
self.inbound_identify_push.take();

if let Ok(info) = res {
self.update_supported_protocols_for_remote(&info);
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified(
info,
)));
if let Ok(remote_info) = res {
self.handle_incoming_push_info(remote_info);
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
108 changes: 90 additions & 18 deletions protocols/identify/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl Push<OutboundPush> {
}
}

/// Information of a peer sent in protocol messages.
/// Identify information of a peer sent in protocol messages.
#[derive(Debug, Clone)]
pub struct Info {
/// The public key of the local peer.
Expand All @@ -82,6 +82,61 @@ pub struct Info {
pub observed_addr: Multiaddr,
}

impl Info {
pub fn merge(&mut self, info: PushInfo) {
if let Some(public_key) = info.public_key {
self.public_key = public_key;
}
if let Some(protocol_version) = info.protocol_version {
self.protocol_version = protocol_version;
}
if let Some(agent_version) = info.agent_version {
self.agent_version = agent_version;
}
if !info.listen_addrs.is_empty() {
self.listen_addrs = info.listen_addrs;
}
if !info.protocols.is_empty() {
self.protocols = info.protocols;
}
if let Some(observed_addr) = info.observed_addr {
self.observed_addr = observed_addr;
}
}
}

/// Identify push information of a peer sent in protocol messages.
/// Note that missing fields should be ignored, as peers may choose to send partial updates containing only the fields whose values have changed.
#[derive(Debug, Clone)]
pub struct PushInfo {
pub public_key: Option<PublicKey>,
pub protocol_version: Option<String>,
pub agent_version: Option<String>,
pub listen_addrs: Vec<Multiaddr>,
pub protocols: Vec<StreamProtocol>,
pub observed_addr: Option<Multiaddr>,
}

#[derive(Debug)]
pub enum MissingInfoError {
PublicKey,
}

impl TryFrom<PushInfo> for Info {
type Error = MissingInfoError;

fn try_from(info: PushInfo) -> Result<Self, Self::Error> {
Ok(Info {
public_key: info.public_key.ok_or(MissingInfoError::PublicKey)?,
protocol_version: info.protocol_version.unwrap_or_default(),
agent_version: info.agent_version.unwrap_or_default(),
listen_addrs: info.listen_addrs,
protocols: info.protocols,
observed_addr: info.observed_addr.unwrap_or_else(Multiaddr::empty),
})
}
}

impl UpgradeInfo for Identify {
type Info = StreamProtocol;
type InfoIter = iter::Once<Self::Info>;
Expand Down Expand Up @@ -110,7 +165,11 @@ where
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;

fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future {
recv(socket).boxed()
recv(socket)
.map(|result| {
result.and_then(|info| info.try_into().map_err(UpgradeError::MissingInfo))
})
.boxed()
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -127,7 +186,7 @@ impl<C> InboundUpgrade<C> for Push<InboundPush>
where
C: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type Output = BoxFuture<'static, Result<Info, UpgradeError>>;
type Output = BoxFuture<'static, Result<PushInfo, UpgradeError>>;
type Error = Void;
type Future = future::Ready<Result<Self::Output, Self::Error>>;

Expand Down Expand Up @@ -184,7 +243,7 @@ where
Ok(())
}

async fn recv<T>(socket: T) -> Result<Info, UpgradeError>
async fn recv<T>(socket: T) -> Result<PushInfo, UpgradeError>
where
T: AsyncRead + AsyncWrite + Unpin,
{
Expand All @@ -207,7 +266,7 @@ where
Ok(info)
}

impl TryFrom<proto::Identify> for Info {
impl TryFrom<proto::Identify> for PushInfo {
type Error = UpgradeError;

fn try_from(msg: proto::Identify) -> Result<Self, Self::Error> {
Expand All @@ -221,26 +280,37 @@ impl TryFrom<proto::Identify> for Info {
match parse_multiaddr(addr) {
Ok(a) => addrs.push(a),
Err(e) => {
debug!("Unable to parse multiaddr: {e:?}");
debug!("Unable to parse listen multiaddr: {e:?}");
}
}
}
addrs
};

let public_key = PublicKey::try_decode_protobuf(&msg.publicKey.unwrap_or_default())?;
let public_key = msg
.publicKey
.and_then(|key| match PublicKey::try_decode_protobuf(&key) {
Ok(k) => Some(k),
Err(e) => {
debug!("Unable to decode public key: {e:?}");
None
}
});

let observed_addr = msg
.observedAddr
.and_then(|bytes| match parse_multiaddr(bytes) {
Ok(a) => Some(a),
Err(e) => {
debug!("Unable to parse observed multiaddr: {e:?}");
None
}
});

let observed_addr = match parse_multiaddr(msg.observedAddr.unwrap_or_default()) {
Ok(a) => a,
Err(e) => {
debug!("Unable to parse multiaddr: {e:?}");
Multiaddr::empty()
}
};
let info = Info {
let info = PushInfo {
public_key,
protocol_version: msg.protocolVersion.unwrap_or_default(),
agent_version: msg.agentVersion.unwrap_or_default(),
protocol_version: msg.protocolVersion,
agent_version: msg.agentVersion,
listen_addrs,
protocols: msg
.protocols
Expand Down Expand Up @@ -268,6 +338,8 @@ pub enum UpgradeError {
Io(#[from] io::Error),
#[error("Stream closed")]
StreamClosed,
#[error("Missing information received")]
MissingInfo(MissingInfoError),
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
#[error("Failed decoding multiaddr")]
Multiaddr(#[from] multiaddr::Error),
#[error("Failed decoding public key")]
Expand Down Expand Up @@ -303,7 +375,7 @@ mod tests {
),
};

let info = Info::try_from(payload).expect("not to fail");
let info = PushInfo::try_from(payload).expect("not to fail");

assert_eq!(info.listen_addrs, vec![valid_multiaddr])
}
Expand Down