From 80ea2f6fd7eb8484318506254935c7c5fb0237ba Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Thu, 9 Apr 2020 16:15:17 +0300 Subject: [PATCH] feat: allow sent messages seen as subscribed (#1520) * feat: allow sent messages seen as subscribed minor feature to allow mimicing the behaviour expected by ipfs api tests. * refactor: rename per review comments * refactor: rename Floodsub::options to config * chore: update changelog * Update CHANGELOG.md Co-Authored-By: Max Inden Co-authored-by: Max Inden Co-authored-by: Pierre Krieger --- CHANGELOG.md | 2 ++ protocols/floodsub/src/layer.rs | 23 ++++++++++++++++------- protocols/floodsub/src/lib.rs | 21 +++++++++++++++++++++ protocols/floodsub/src/protocol.rs | 14 +++++++------- 4 files changed, 46 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 05e66be6ba6..53459981bb5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ # Version ??? +- `libp2p-floodsub`: Allow sent messages seen as subscribed. + [PR 1520](https://github.com/libp2p/rust-libp2p/pull/1520) # Version 0.17.0 (2020-04-02) diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index 1c837b2d0f0..4bb6aa08cbf 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -18,8 +18,9 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::protocol::{FloodsubConfig, FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction}; +use crate::protocol::{FloodsubProtocol, FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction}; use crate::topic::Topic; +use crate::FloodsubConfig; use cuckoofilter::CuckooFilter; use fnv::FnvHashSet; use libp2p_core::{Multiaddr, PeerId, connection::ConnectionId}; @@ -43,8 +44,7 @@ pub struct Floodsub { /// Events that need to be yielded to the outside when polling. events: VecDeque>, - /// Peer id of the local node. Used for the source of the messages that we publish. - local_peer_id: PeerId, + config: FloodsubConfig, /// List of peers to send messages to. target_peers: FnvHashSet, @@ -64,11 +64,16 @@ pub struct Floodsub { } impl Floodsub { - /// Creates a `Floodsub`. + /// Creates a `Floodsub` with default configuration. pub fn new(local_peer_id: PeerId) -> Self { + Self::from_config(FloodsubConfig::new(local_peer_id)) + } + + /// Creates a `Floodsub` with the given configuration. + pub fn from_config(config: FloodsubConfig) -> Self { Floodsub { events: VecDeque::new(), - local_peer_id, + config, target_peers: FnvHashSet::default(), connected_peers: HashMap::new(), subscribed_topics: SmallVec::new(), @@ -190,7 +195,7 @@ impl Floodsub { fn publish_many_inner(&mut self, topic: impl IntoIterator>, data: impl Into>, check_self_subscriptions: bool) { let message = FloodsubMessage { - source: self.local_peer_id.clone(), + source: self.config.local_peer_id.clone(), data: data.into(), // If the sequence numbers are predictable, then an attacker could flood the network // with packets with the predetermined sequence numbers and absorb our legitimate @@ -202,6 +207,10 @@ impl Floodsub { let self_subscribed = self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t == u)); if self_subscribed { self.received.add(&message); + if self.config.subscribe_local_messages { + self.events.push_back( + NetworkBehaviourAction::GenerateEvent(FloodsubEvent::Message(message.clone()))); + } } // Don't publish the message if we have to check subscriptions // and we're not subscribed ourselves to any of the topics. @@ -228,7 +237,7 @@ impl Floodsub { } impl NetworkBehaviour for Floodsub { - type ProtocolsHandler = OneShotHandler; + type ProtocolsHandler = OneShotHandler; type OutEvent = FloodsubEvent; fn new_handler(&mut self) -> Self::ProtocolsHandler { diff --git a/protocols/floodsub/src/lib.rs b/protocols/floodsub/src/lib.rs index cc9e840af79..8e7014bedaa 100644 --- a/protocols/floodsub/src/lib.rs +++ b/protocols/floodsub/src/lib.rs @@ -21,6 +21,8 @@ //! Implements the floodsub protocol, see also the: //! [spec](https://github.com/libp2p/specs/tree/master/pubsub). +use libp2p_core::PeerId; + pub mod protocol; mod layer; @@ -33,3 +35,22 @@ mod rpc_proto { pub use self::layer::{Floodsub, FloodsubEvent}; pub use self::protocol::{FloodsubMessage, FloodsubRpc}; pub use self::topic::Topic; + +/// Configuration options for the Floodsub protocol. +pub struct FloodsubConfig { + /// Peer id of the local node. Used for the source of the messages that we publish. + pub local_peer_id: PeerId, + + /// `true` if messages published by local node should be propagated as messages received from + /// the network, `false` by default. + pub subscribe_local_messages: bool, +} + +impl FloodsubConfig { + pub fn new(local_peer_id: PeerId) -> Self { + Self { + local_peer_id, + subscribe_local_messages: false + } + } +} diff --git a/protocols/floodsub/src/protocol.rs b/protocols/floodsub/src/protocol.rs index 4df3975eddb..046c72d856b 100644 --- a/protocols/floodsub/src/protocol.rs +++ b/protocols/floodsub/src/protocol.rs @@ -27,16 +27,16 @@ use futures::{Future, io::{AsyncRead, AsyncWrite}}; /// Implementation of `ConnectionUpgrade` for the floodsub protocol. #[derive(Debug, Clone, Default)] -pub struct FloodsubConfig {} +pub struct FloodsubProtocol {} -impl FloodsubConfig { - /// Builds a new `FloodsubConfig`. - pub fn new() -> FloodsubConfig { - FloodsubConfig {} +impl FloodsubProtocol { + /// Builds a new `FloodsubProtocol`. + pub fn new() -> FloodsubProtocol { + FloodsubProtocol {} } } -impl UpgradeInfo for FloodsubConfig { +impl UpgradeInfo for FloodsubProtocol { type Info = &'static [u8]; type InfoIter = iter::Once; @@ -45,7 +45,7 @@ impl UpgradeInfo for FloodsubConfig { } } -impl InboundUpgrade for FloodsubConfig +impl InboundUpgrade for FloodsubProtocol where TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static, {