From ab59af4d46e0f6835429204f622475adcb6b74df Mon Sep 17 00:00:00 2001 From: StemCll Date: Fri, 27 Jan 2023 05:44:04 +0100 Subject: [PATCH] refactor(gossipsub): revise symbol naming to follow conventions (#3303) Changes regarding the #2217 --- examples/gossipsub-chat.rs | 26 +- examples/ipfs-private.rs | 19 +- misc/metrics/src/gossipsub.rs | 6 +- misc/metrics/src/lib.rs | 4 +- protocols/gossipsub/CHANGELOG.md | 8 + protocols/gossipsub/src/behaviour.rs | 240 +++++----- protocols/gossipsub/src/behaviour/tests.rs | 421 ++++++++---------- protocols/gossipsub/src/config.rs | 107 +++-- protocols/gossipsub/src/error.rs | 14 +- protocols/gossipsub/src/handler.rs | 46 +- protocols/gossipsub/src/lib.rs | 86 +++- protocols/gossipsub/src/mcache.rs | 28 +- protocols/gossipsub/src/peer_score/tests.rs | 12 +- protocols/gossipsub/src/protocol.rs | 81 ++-- .../gossipsub/src/subscription_filter.rs | 74 +-- protocols/gossipsub/src/transform.rs | 24 +- protocols/gossipsub/src/types.rs | 48 +- protocols/gossipsub/tests/smoke.rs | 28 +- 18 files changed, 639 insertions(+), 633 deletions(-) diff --git a/examples/gossipsub-chat.rs b/examples/gossipsub-chat.rs index 9150aa64624..0353b72ec5e 100644 --- a/examples/gossipsub-chat.rs +++ b/examples/gossipsub-chat.rs @@ -47,11 +47,6 @@ use async_std::io; use futures::{prelude::*, select}; -use libp2p::gossipsub::MessageId; -use libp2p::gossipsub::{ - Gossipsub, GossipsubEvent, GossipsubMessage, IdentTopic as Topic, MessageAuthenticity, - ValidationMode, -}; use libp2p::{ gossipsub, identity, mdns, swarm::NetworkBehaviour, swarm::SwarmEvent, PeerId, Swarm, }; @@ -73,31 +68,34 @@ async fn main() -> Result<(), Box> { // We create a custom network behaviour that combines Gossipsub and Mdns. #[derive(NetworkBehaviour)] struct MyBehaviour { - gossipsub: Gossipsub, + gossipsub: gossipsub::Behaviour, mdns: mdns::async_io::Behaviour, } // To content-address message, we can take the hash of message and use it as an ID. - let message_id_fn = |message: &GossipsubMessage| { + let message_id_fn = |message: &gossipsub::Message| { let mut s = DefaultHasher::new(); message.data.hash(&mut s); - MessageId::from(s.finish().to_string()) + gossipsub::MessageId::from(s.finish().to_string()) }; // Set a custom gossipsub configuration - let gossipsub_config = gossipsub::GossipsubConfigBuilder::default() + let gossipsub_config = gossipsub::ConfigBuilder::default() .heartbeat_interval(Duration::from_secs(10)) // This is set to aid debugging by not cluttering the log space - .validation_mode(ValidationMode::Strict) // This sets the kind of message validation. The default is Strict (enforce message signing) + .validation_mode(gossipsub::ValidationMode::Strict) // This sets the kind of message validation. The default is Strict (enforce message signing) .message_id_fn(message_id_fn) // content-address messages. No two messages of the same content will be propagated. .build() .expect("Valid config"); // build a gossipsub network behaviour - let mut gossipsub = Gossipsub::new(MessageAuthenticity::Signed(local_key), gossipsub_config) - .expect("Correct configuration"); + let mut gossipsub = gossipsub::Behaviour::new( + gossipsub::MessageAuthenticity::Signed(local_key), + gossipsub_config, + ) + .expect("Correct configuration"); // Create a Gossipsub topic - let topic = Topic::new("test-net"); + let topic = gossipsub::IdentTopic::new("test-net"); // subscribes to our topic gossipsub.subscribe(&topic)?; @@ -140,7 +138,7 @@ async fn main() -> Result<(), Box> { swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer_id); } }, - SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(GossipsubEvent::Message { + SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(gossipsub::Event::Message { propagation_source: peer_id, message_id: id, message, diff --git a/examples/ipfs-private.rs b/examples/ipfs-private.rs index af0eab7aedf..446fa5d2bf6 100644 --- a/examples/ipfs-private.rs +++ b/examples/ipfs-private.rs @@ -36,8 +36,7 @@ use either::Either; use futures::{prelude::*, select}; use libp2p::{ core::{muxing::StreamMuxerBox, transport, transport::upgrade::Version}, - gossipsub::{self, Gossipsub, GossipsubConfigBuilder, GossipsubEvent, MessageAuthenticity}, - identify, identity, + gossipsub, identify, identity, multiaddr::Protocol, noise, ping, pnet::{PnetConfig, PreSharedKey}, @@ -150,19 +149,19 @@ async fn main() -> Result<(), Box> { #[derive(NetworkBehaviour)] #[behaviour(out_event = "MyBehaviourEvent")] struct MyBehaviour { - gossipsub: Gossipsub, + gossipsub: gossipsub::Behaviour, identify: identify::Behaviour, ping: ping::Behaviour, } enum MyBehaviourEvent { - Gossipsub(GossipsubEvent), + Gossipsub(gossipsub::Event), Identify(identify::Event), Ping(ping::Event), } - impl From for MyBehaviourEvent { - fn from(event: GossipsubEvent) -> Self { + impl From for MyBehaviourEvent { + fn from(event: gossipsub::Event) -> Self { MyBehaviourEvent::Gossipsub(event) } } @@ -181,13 +180,13 @@ async fn main() -> Result<(), Box> { // Create a Swarm to manage peers and events let mut swarm = { - let gossipsub_config = GossipsubConfigBuilder::default() + let gossipsub_config = gossipsub::ConfigBuilder::default() .max_transmit_size(262144) .build() .expect("valid config"); let mut behaviour = MyBehaviour { - gossipsub: Gossipsub::new( - MessageAuthenticity::Signed(local_key.clone()), + gossipsub: gossipsub::Behaviour::new( + gossipsub::MessageAuthenticity::Signed(local_key.clone()), gossipsub_config, ) .expect("Valid configuration"), @@ -236,7 +235,7 @@ async fn main() -> Result<(), Box> { SwarmEvent::Behaviour(MyBehaviourEvent::Identify(event)) => { println!("identify: {event:?}"); } - SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(GossipsubEvent::Message { + SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(gossipsub::Event::Message { propagation_source: peer_id, message_id: id, message, diff --git a/misc/metrics/src/gossipsub.rs b/misc/metrics/src/gossipsub.rs index 4f4bf490d27..b00671ac636 100644 --- a/misc/metrics/src/gossipsub.rs +++ b/misc/metrics/src/gossipsub.rs @@ -36,9 +36,9 @@ impl Metrics { } } -impl super::Recorder for Metrics { - fn record(&self, event: &libp2p_gossipsub::GossipsubEvent) { - if let libp2p_gossipsub::GossipsubEvent::Message { .. } = event { +impl super::Recorder for Metrics { + fn record(&self, event: &libp2p_gossipsub::Event) { + if let libp2p_gossipsub::Event::Message { .. } = event { self.messages.inc(); } } diff --git a/misc/metrics/src/lib.rs b/misc/metrics/src/lib.rs index aa9f3d924e7..ec166eebecc 100644 --- a/misc/metrics/src/lib.rs +++ b/misc/metrics/src/lib.rs @@ -108,8 +108,8 @@ impl Recorder for Metrics { #[cfg(feature = "gossipsub")] #[cfg(not(target_os = "unknown"))] -impl Recorder for Metrics { - fn record(&self, event: &libp2p_gossipsub::GossipsubEvent) { +impl Recorder for Metrics { + fn record(&self, event: &libp2p_gossipsub::Event) { self.gossipsub.record(event) } } diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index 335ed0bcf6e..c24178fcb94 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -8,8 +8,16 @@ - Initialize `ProtocolConfig` via `GossipsubConfig`. See [PR 3381]. +- Rename types as per [discussion 2174]. + `Gossipsub` has been renamed to `Behaviour`. + The `Gossipsub` prefix has been removed from various types like `GossipsubConfig` or `GossipsubMessage`. + It is preferred to import the gossipsub protocol as a module (`use libp2p::gossipsub;`), and refer to its types via `gossipsub::`. + For example: `gossipsub::Behaviour` or `gossipsub::RawMessage`. See [PR 3303]. + [PR 3207]: https://github.com/libp2p/rust-libp2p/pull/3207/ +[PR 3303]: https://github.com/libp2p/rust-libp2p/pull/3303/ [PR 3381]: https://github.com/libp2p/rust-libp2p/pull/3381/ +[discussion 2174]: https://github.com/libp2p/rust-libp2p/discussions/2174 # 0.43.0 diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index f8db69a4d24..eccecd3a498 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -32,7 +32,7 @@ use std::{ use futures::StreamExt; use log::{debug, error, trace, warn}; use prometheus_client::registry::Registry; -use prost::Message; +use prost::Message as _; use rand::{seq::SliceRandom, thread_rng}; use libp2p_core::{ @@ -47,10 +47,10 @@ use libp2p_swarm::{ use wasm_timer::Instant; use crate::backoff::BackoffStorage; -use crate::config::{GossipsubConfig, ValidationMode}; +use crate::config::{Config, ValidationMode}; use crate::error::{PublishError, SubscriptionError, ValidationError}; use crate::gossip_promises::GossipPromises; -use crate::handler::{GossipsubHandler, GossipsubHandlerIn, HandlerEvent}; +use crate::handler::{Handler, HandlerEvent, HandlerIn}; use crate::mcache::MessageCache; use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty}; use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason}; @@ -60,10 +60,10 @@ use crate::time_cache::{DuplicateCache, TimeCache}; use crate::topic::{Hasher, Topic, TopicHash}; use crate::transform::{DataTransform, IdentityTransform}; use crate::types::{ - FastMessageId, GossipsubControlAction, GossipsubMessage, GossipsubSubscription, - GossipsubSubscriptionAction, MessageAcceptance, MessageId, PeerInfo, RawGossipsubMessage, + ControlAction, FastMessageId, Message, MessageAcceptance, MessageId, PeerInfo, RawMessage, + Subscription, SubscriptionAction, }; -use crate::types::{GossipsubRpc, PeerConnections, PeerKind}; +use crate::types::{PeerConnections, PeerKind, Rpc}; use crate::{rpc_proto, TopicScoreParams}; use std::{cmp::Ordering::Equal, fmt::Debug}; use wasm_timer::Interval; @@ -76,7 +76,7 @@ mod tests; /// Without signing, a number of privacy preserving modes can be selected. /// /// NOTE: The default validation settings are to require signatures. The [`ValidationMode`] -/// should be updated in the [`GossipsubConfig`] to allow for unsigned messages. +/// should be updated in the [`Config`] to allow for unsigned messages. #[derive(Clone)] pub enum MessageAuthenticity { /// Message signing is enabled. The author will be the owner of the key and the sequence number @@ -97,7 +97,7 @@ pub enum MessageAuthenticity { /// The author of the message and the sequence numbers are excluded from the message. /// /// NOTE: Excluding these fields may make these messages invalid by other nodes who - /// enforce validation of these fields. See [`ValidationMode`] in the [`GossipsubConfig`] + /// enforce validation of these fields. See [`ValidationMode`] in the [`Config`] /// for how to customise this for rust-libp2p gossipsub. A custom `message_id` /// function will need to be set to prevent all messages from a peer being filtered /// as duplicates. @@ -117,7 +117,7 @@ impl MessageAuthenticity { /// Event that can be emitted by the gossipsub behaviour. #[derive(Debug)] -pub enum GossipsubEvent { +pub enum Event { /// A message has been received. Message { /// The peer that forwarded us this message. @@ -126,7 +126,7 @@ pub enum GossipsubEvent { /// validating a message (if required). message_id: MessageId, /// The decompressed message itself. - message: GossipsubMessage, + message: Message, }, /// A remote subscribed to a topic. Subscribed { @@ -201,7 +201,7 @@ impl From for PublishConfig { /// Network behaviour that handles the gossipsub protocol. /// -/// NOTE: Initialisation requires a [`MessageAuthenticity`] and [`GossipsubConfig`] instance. If +/// NOTE: Initialisation requires a [`MessageAuthenticity`] and [`Config`] instance. If /// message signing is disabled, the [`ValidationMode`] in the config should be adjusted to an /// appropriate level to accept unsigned messages. /// @@ -210,15 +210,15 @@ impl From for PublishConfig { /// /// The TopicSubscriptionFilter allows applications to implement specific filters on topics to /// prevent unwanted messages being propagated and evaluated. -pub struct Gossipsub { +pub struct Behaviour { /// Configuration providing gossipsub performance parameters. - config: GossipsubConfig, + config: Config, /// Events that need to be yielded to the outside when polling. - events: VecDeque>, + events: VecDeque>, /// Pools non-urgent control messages between heartbeats. - control_pool: HashMap>, + control_pool: HashMap>, /// Information used for publishing messages. publish_config: PublishConfig, @@ -310,17 +310,14 @@ pub struct Gossipsub { metrics: Option, } -impl Gossipsub +impl Behaviour where D: DataTransform + Default, F: TopicSubscriptionFilter + Default, { - /// Creates a [`Gossipsub`] struct given a set of parameters specified via a - /// [`GossipsubConfig`]. This has no subscription filter and uses no compression. - pub fn new( - privacy: MessageAuthenticity, - config: GossipsubConfig, - ) -> Result { + /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a + /// [`Config`]. This has no subscription filter and uses no compression. + pub fn new(privacy: MessageAuthenticity, config: Config) -> Result { Self::new_with_subscription_filter_and_transform( privacy, config, @@ -330,12 +327,12 @@ where ) } - /// Creates a [`Gossipsub`] struct given a set of parameters specified via a - /// [`GossipsubConfig`]. This has no subscription filter and uses no compression. + /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a + /// [`Config`]. This has no subscription filter and uses no compression. /// Metrics can be evaluated by passing a reference to a [`Registry`]. pub fn new_with_metrics( privacy: MessageAuthenticity, - config: GossipsubConfig, + config: Config, metrics_registry: &mut Registry, metrics_config: MetricsConfig, ) -> Result { @@ -349,16 +346,16 @@ where } } -impl Gossipsub +impl Behaviour where D: DataTransform + Default, F: TopicSubscriptionFilter, { - /// Creates a [`Gossipsub`] struct given a set of parameters specified via a - /// [`GossipsubConfig`] and a custom subscription filter. + /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a + /// [`Config`] and a custom subscription filter. pub fn new_with_subscription_filter( privacy: MessageAuthenticity, - config: GossipsubConfig, + config: Config, metrics: Option<(&mut Registry, MetricsConfig)>, subscription_filter: F, ) -> Result { @@ -372,16 +369,16 @@ where } } -impl Gossipsub +impl Behaviour where D: DataTransform, F: TopicSubscriptionFilter + Default, { - /// Creates a [`Gossipsub`] struct given a set of parameters specified via a - /// [`GossipsubConfig`] and a custom data transform. + /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a + /// [`Config`] and a custom data transform. pub fn new_with_transform( privacy: MessageAuthenticity, - config: GossipsubConfig, + config: Config, metrics: Option<(&mut Registry, MetricsConfig)>, data_transform: D, ) -> Result { @@ -395,16 +392,16 @@ where } } -impl Gossipsub +impl Behaviour where D: DataTransform, F: TopicSubscriptionFilter, { - /// Creates a [`Gossipsub`] struct given a set of parameters specified via a - /// [`GossipsubConfig`] and a custom subscription filter and data transform. + /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a + /// [`Config`] and a custom subscription filter and data transform. pub fn new_with_subscription_filter_and_transform( privacy: MessageAuthenticity, - config: GossipsubConfig, + config: Config, metrics: Option<(&mut Registry, MetricsConfig)>, subscription_filter: F, data_transform: D, @@ -415,7 +412,7 @@ where // were received locally. validate_config(&privacy, config.validation_mode())?; - Ok(Gossipsub { + Ok(Behaviour { metrics: metrics.map(|(registry, cfg)| Metrics::new(registry, cfg)), events: VecDeque::new(), control_pool: HashMap::new(), @@ -455,7 +452,7 @@ where } } -impl Gossipsub +impl Behaviour where D: DataTransform + Send + 'static, F: TopicSubscriptionFilter + Send + 'static, @@ -516,11 +513,11 @@ where // send subscription request to all peers let peer_list = self.peer_topics.keys().cloned().collect::>(); if !peer_list.is_empty() { - let event = GossipsubRpc { + let event = Rpc { messages: Vec::new(), - subscriptions: vec![GossipsubSubscription { + subscriptions: vec![Subscription { topic_hash: topic_hash.clone(), - action: GossipsubSubscriptionAction::Subscribe, + action: SubscriptionAction::Subscribe, }], control_msgs: Vec::new(), } @@ -556,11 +553,11 @@ where // announce to all peers let peer_list = self.peer_topics.keys().cloned().collect::>(); if !peer_list.is_empty() { - let event = GossipsubRpc { + let event = Rpc { messages: Vec::new(), - subscriptions: vec![GossipsubSubscription { + subscriptions: vec![Subscription { topic_hash: topic_hash.clone(), - action: GossipsubSubscriptionAction::Unsubscribe, + action: SubscriptionAction::Unsubscribe, }], control_msgs: Vec::new(), } @@ -597,14 +594,14 @@ where let raw_message = self.build_raw_message(topic, transformed_data)?; // calculate the message id from the un-transformed data - let msg_id = self.config.message_id(&GossipsubMessage { + let msg_id = self.config.message_id(&Message { source: raw_message.source, data, // the uncompressed form sequence_number: raw_message.sequence_number, topic: raw_message.topic.clone(), }); - let event = GossipsubRpc { + let event = Rpc { subscriptions: Vec::new(), messages: vec![raw_message.clone()], control_msgs: Vec::new(), @@ -742,7 +739,7 @@ where Ok(msg_id) } - /// This function should be called when [`GossipsubConfig::validate_messages()`] is `true` after + /// This function should be called when [`Config::validate_messages()`] is `true` after /// the message got validated by the caller. Messages are stored in the ['Memcache'] and /// validation is expected to be fast enough that the messages should still exist in the cache. /// There are three possible validation outcomes and the outcome is given in acceptance. @@ -1011,7 +1008,7 @@ where Self::control_pool_add( &mut self.control_pool, peer_id, - GossipsubControlAction::Graft { + ControlAction::Graft { topic_hash: topic_hash.clone(), }, ); @@ -1042,7 +1039,7 @@ where peer: &PeerId, do_px: bool, on_unsubscribe: bool, - ) -> GossipsubControlAction { + ) -> ControlAction { if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.prune(peer, topic_hash.clone()); } @@ -1053,7 +1050,7 @@ where } Some(PeerKind::Gossipsub) => { // GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway - return GossipsubControlAction::Prune { + return ControlAction::Prune { topic_hash: topic_hash.clone(), peers: Vec::new(), backoff: None, @@ -1090,7 +1087,7 @@ where // update backoff self.backoffs.update_backoff(topic_hash, peer, backoff); - GossipsubControlAction::Prune { + ControlAction::Prune { topic_hash: topic_hash.clone(), peers, backoff: Some(backoff.as_secs()), @@ -1285,7 +1282,7 @@ where Self::control_pool_add( &mut self.control_pool, *peer_id, - GossipsubControlAction::IWant { + ControlAction::IWant { message_ids: iwant_ids_vec, }, ); @@ -1335,7 +1332,7 @@ where .map(|message| message.topic.clone()) .collect::>(); - let message = GossipsubRpc { + let message = Rpc { subscriptions: Vec::new(), messages: message_list, control_msgs: Vec::new(), @@ -1510,7 +1507,7 @@ where if let Err(e) = self.send_message( *peer_id, - GossipsubRpc { + Rpc { subscriptions: Vec::new(), messages: Vec::new(), control_msgs: prune_messages, @@ -1648,7 +1645,7 @@ where fn message_is_valid( &mut self, msg_id: &MessageId, - raw_message: &mut RawGossipsubMessage, + raw_message: &mut RawMessage, propagation_source: &PeerId, ) -> bool { debug!( @@ -1719,12 +1716,12 @@ where true } - /// Handles a newly received [`RawGossipsubMessage`]. + /// Handles a newly received [`RawMessage`]. /// /// Forwards the message to all peers in the mesh. fn handle_received_message( &mut self, - mut raw_message: RawGossipsubMessage, + mut raw_message: RawMessage, propagation_source: &PeerId, ) { // Record the received metric @@ -1821,13 +1818,12 @@ where // Dispatch the message to the user if we are subscribed to any of the topics if self.mesh.contains_key(&message.topic) { debug!("Sending received message to user"); - self.events.push_back(NetworkBehaviourAction::GenerateEvent( - GossipsubEvent::Message { + self.events + .push_back(NetworkBehaviourAction::GenerateEvent(Event::Message { propagation_source: *propagation_source, message_id: msg_id.clone(), message, - }, - )); + })); } else { debug!( "Received message on a topic we are not subscribed to: {:?}", @@ -1857,7 +1853,7 @@ where fn handle_invalid_message( &mut self, propagation_source: &PeerId, - raw_message: &RawGossipsubMessage, + raw_message: &RawMessage, reject_reason: RejectReason, ) { if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { @@ -1891,7 +1887,7 @@ where /// Handles received subscriptions. fn handle_received_subscriptions( &mut self, - subscriptions: &[GossipsubSubscription], + subscriptions: &[Subscription], propagation_source: &PeerId, ) { debug!( @@ -1943,7 +1939,7 @@ where .or_insert_with(Default::default); match subscription.action { - GossipsubSubscriptionAction::Subscribe => { + SubscriptionAction::Subscribe => { if peer_list.insert(*propagation_source) { debug!( "SUBSCRIPTION: Adding gossip peer: {} to topic: {:?}", @@ -2000,13 +1996,13 @@ where } // generates a subscription event to be polled application_event.push(NetworkBehaviourAction::GenerateEvent( - GossipsubEvent::Subscribed { + Event::Subscribed { peer_id: *propagation_source, topic: topic_hash.clone(), }, )); } - GossipsubSubscriptionAction::Unsubscribe => { + SubscriptionAction::Unsubscribe => { if peer_list.remove(propagation_source) { debug!( "SUBSCRIPTION: Removing gossip peer: {} from topic: {:?}", @@ -2020,7 +2016,7 @@ where unsubscribed_peers.push((*propagation_source, topic_hash.clone())); // generate an unsubscribe event to be polled application_event.push(NetworkBehaviourAction::GenerateEvent( - GossipsubEvent::Unsubscribed { + Event::Unsubscribed { peer_id: *propagation_source, topic: topic_hash.clone(), }, @@ -2057,12 +2053,12 @@ where && self .send_message( *propagation_source, - GossipsubRpc { + Rpc { subscriptions: Vec::new(), messages: Vec::new(), control_msgs: topics_to_graft .into_iter() - .map(|topic_hash| GossipsubControlAction::Graft { topic_hash }) + .map(|topic_hash| ControlAction::Graft { topic_hash }) .collect(), } .into_protobuf(), @@ -2557,7 +2553,7 @@ where Self::control_pool_add( &mut self.control_pool, peer, - GossipsubControlAction::IHave { + ControlAction::IHave { topic_hash: topic_hash.clone(), message_ids: peer_message_ids, }, @@ -2593,9 +2589,9 @@ where &self.connected_peers, ); } - let mut control_msgs: Vec = topics + let mut control_msgs: Vec = topics .iter() - .map(|topic_hash| GossipsubControlAction::Graft { + .map(|topic_hash| ControlAction::Graft { topic_hash: topic_hash.clone(), }) .collect(); @@ -2626,7 +2622,7 @@ where if self .send_message( peer, - GossipsubRpc { + Rpc { subscriptions: Vec::new(), messages: Vec::new(), control_msgs, @@ -2666,7 +2662,7 @@ where if self .send_message( *peer, - GossipsubRpc { + Rpc { subscriptions: Vec::new(), messages: Vec::new(), control_msgs: remaining_prunes, @@ -2686,7 +2682,7 @@ where fn forward_msg( &mut self, msg_id: &MessageId, - message: RawGossipsubMessage, + message: RawMessage, propagation_source: Option<&PeerId>, originating_peers: HashSet, ) -> Result { @@ -2733,7 +2729,7 @@ where // forward the message to peers if !recipient_peers.is_empty() { - let event = GossipsubRpc { + let event = Rpc { subscriptions: Vec::new(), messages: vec![message.clone()], control_msgs: Vec::new(), @@ -2755,12 +2751,12 @@ where } } - /// Constructs a [`RawGossipsubMessage`] performing message signing if required. + /// Constructs a [`RawMessage`] performing message signing if required. pub(crate) fn build_raw_message( &self, topic: TopicHash, data: Vec, - ) -> Result { + ) -> Result { match &self.publish_config { PublishConfig::Signing { ref keypair, @@ -2791,7 +2787,7 @@ where Some(keypair.sign(&signature_bytes)?) }; - Ok(RawGossipsubMessage { + Ok(RawMessage { source: Some(*author), data, // To be interoperable with the go-implementation this is treated as a 64-bit @@ -2804,7 +2800,7 @@ where }) } PublishConfig::Author(peer_id) => { - Ok(RawGossipsubMessage { + Ok(RawMessage { source: Some(*peer_id), data, // To be interoperable with the go-implementation this is treated as a 64-bit @@ -2817,7 +2813,7 @@ where }) } PublishConfig::RandomAuthor => { - Ok(RawGossipsubMessage { + Ok(RawMessage { source: Some(PeerId::random()), data, // To be interoperable with the go-implementation this is treated as a 64-bit @@ -2830,7 +2826,7 @@ where }) } PublishConfig::Anonymous => { - Ok(RawGossipsubMessage { + Ok(RawMessage { source: None, data, // To be interoperable with the go-implementation this is treated as a 64-bit @@ -2847,9 +2843,9 @@ where // adds a control action to control_pool fn control_pool_add( - control_pool: &mut HashMap>, + control_pool: &mut HashMap>, peer: PeerId, - control: GossipsubControlAction, + control: ControlAction, ) { control_pool .entry(peer) @@ -2863,7 +2859,7 @@ where if self .send_message( peer, - GossipsubRpc { + Rpc { subscriptions: Vec::new(), messages: Vec::new(), control_msgs: controls, @@ -2880,7 +2876,7 @@ where self.pending_iwant_msgs.clear(); } - /// Send a GossipsubRpc message to a peer. This will wrap the message in an arc if it + /// Send a [`Rpc`] message to a peer. This will wrap the message in an arc if it /// is not already an arc. fn send_message( &mut self, @@ -2897,7 +2893,7 @@ where self.events .push_back(NetworkBehaviourAction::NotifyHandler { peer_id, - event: GossipsubHandlerIn::Message(message), + event: HandlerIn::Message(message), handler: NotifyHandler::Any, }) } @@ -3080,9 +3076,9 @@ where // We need to send our subscriptions to the newly-connected node. let mut subscriptions = vec![]; for topic_hash in self.mesh.keys() { - subscriptions.push(GossipsubSubscription { + subscriptions.push(Subscription { topic_hash: topic_hash.clone(), - action: GossipsubSubscriptionAction::Subscribe, + action: SubscriptionAction::Subscribe, }); } @@ -3091,7 +3087,7 @@ where if self .send_message( peer_id, - GossipsubRpc { + Rpc { messages: Vec::new(), subscriptions, control_msgs: Vec::new(), @@ -3157,7 +3153,7 @@ where self.events .push_back(NetworkBehaviourAction::NotifyHandler { peer_id, - event: GossipsubHandlerIn::JoinedMesh, + event: HandlerIn::JoinedMesh, handler: NotifyHandler::One(connections.connections[0]), }); break; @@ -3289,16 +3285,16 @@ fn get_ip_addr(addr: &Multiaddr) -> Option { }) } -impl NetworkBehaviour for Gossipsub +impl NetworkBehaviour for Behaviour where C: Send + 'static + DataTransform, F: Send + 'static + TopicSubscriptionFilter, { - type ConnectionHandler = GossipsubHandler; - type OutEvent = GossipsubEvent; + type ConnectionHandler = Handler; + type OutEvent = Event; fn new_handler(&mut self) -> Self::ConnectionHandler { - GossipsubHandler::new( + Handler::new( ProtocolConfig::new(&self.config), self.config.idle_timeout(), ) @@ -3324,7 +3320,7 @@ where propagation_source ); self.events.push_back(NetworkBehaviourAction::GenerateEvent( - GossipsubEvent::GossipsubNotSupported { + Event::GossipsubNotSupported { peer_id: propagation_source, }, )); @@ -3401,17 +3397,17 @@ where let mut prune_msgs = vec![]; for control_msg in rpc.control_msgs { match control_msg { - GossipsubControlAction::IHave { + ControlAction::IHave { topic_hash, message_ids, } => { ihave_msgs.push((topic_hash, message_ids)); } - GossipsubControlAction::IWant { message_ids } => { + ControlAction::IWant { message_ids } => { self.handle_iwant(&propagation_source, message_ids) } - GossipsubControlAction::Graft { topic_hash } => graft_msgs.push(topic_hash), - GossipsubControlAction::Prune { + ControlAction::Graft { topic_hash } => graft_msgs.push(topic_hash), + ControlAction::Prune { topic_hash, peers, backoff, @@ -3484,7 +3480,7 @@ fn peer_added_to_mesh( new_topics: Vec<&TopicHash>, mesh: &HashMap>, known_topics: Option<&BTreeSet>, - events: &mut VecDeque>, + events: &mut VecDeque>, connections: &HashMap, ) { // Ensure there is an active connection @@ -3512,7 +3508,7 @@ fn peer_added_to_mesh( // This is the first mesh the peer has joined, inform the handler events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id, - event: GossipsubHandlerIn::JoinedMesh, + event: HandlerIn::JoinedMesh, handler: NotifyHandler::One(connection_id), }); } @@ -3525,7 +3521,7 @@ fn peer_removed_from_mesh( old_topic: &TopicHash, mesh: &HashMap>, known_topics: Option<&BTreeSet>, - events: &mut VecDeque>, + events: &mut VecDeque>, connections: &HashMap, ) { // Ensure there is an active connection @@ -3551,7 +3547,7 @@ fn peer_removed_from_mesh( // The peer is not in any other mesh, inform the handler events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id, - event: GossipsubHandlerIn::LeftMesh, + event: HandlerIn::LeftMesh, handler: NotifyHandler::One(*connection_id), }); } @@ -3641,9 +3637,9 @@ fn validate_config( Ok(()) } -impl fmt::Debug for Gossipsub { +impl fmt::Debug for Behaviour { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Gossipsub") + f.debug_struct("Behaviour") .field("config", &self.config) .field("events", &self.events.len()) .field("control_pool", &self.control_pool) @@ -3681,16 +3677,16 @@ mod local_test { use asynchronous_codec::Encoder; use quickcheck::*; - fn empty_rpc() -> GossipsubRpc { - GossipsubRpc { + fn empty_rpc() -> Rpc { + Rpc { subscriptions: Vec::new(), messages: Vec::new(), control_msgs: Vec::new(), } } - fn test_message() -> RawGossipsubMessage { - RawGossipsubMessage { + fn test_message() -> RawMessage { + RawMessage { source: Some(PeerId::random()), data: vec![0; 100], sequence_number: None, @@ -3701,21 +3697,21 @@ mod local_test { } } - fn test_subscription() -> GossipsubSubscription { - GossipsubSubscription { - action: GossipsubSubscriptionAction::Subscribe, + fn test_subscription() -> Subscription { + Subscription { + action: SubscriptionAction::Subscribe, topic_hash: IdentTopic::new("TestTopic").hash(), } } - fn test_control() -> GossipsubControlAction { - GossipsubControlAction::IHave { + fn test_control() -> ControlAction { + ControlAction::IHave { topic_hash: IdentTopic::new("TestTopic").hash(), message_ids: vec![MessageId(vec![12u8]); 5], } } - impl Arbitrary for GossipsubRpc { + impl Arbitrary for Rpc { fn arbitrary(g: &mut Gen) -> Self { let mut rpc = empty_rpc(); @@ -3736,12 +3732,12 @@ mod local_test { /// Tests RPC message fragmentation fn test_message_fragmentation_deterministic() { let max_transmit_size = 500; - let config = crate::GossipsubConfigBuilder::default() + let config = crate::config::ConfigBuilder::default() .max_transmit_size(max_transmit_size) .validation_mode(ValidationMode::Permissive) .build() .unwrap(); - let gs: Gossipsub = Gossipsub::new(MessageAuthenticity::RandomAuthor, config).unwrap(); + let gs: Behaviour = Behaviour::new(MessageAuthenticity::RandomAuthor, config).unwrap(); // Message under the limit should be fine. let mut rpc = empty_rpc(); @@ -3782,14 +3778,14 @@ mod local_test { #[test] fn test_message_fragmentation() { - fn prop(rpc: GossipsubRpc) { + fn prop(rpc: Rpc) { let max_transmit_size = 500; - let config = crate::GossipsubConfigBuilder::default() + let config = crate::config::ConfigBuilder::default() .max_transmit_size(max_transmit_size) .validation_mode(ValidationMode::Permissive) .build() .unwrap(); - let gs: Gossipsub = Gossipsub::new(MessageAuthenticity::RandomAuthor, config).unwrap(); + let gs: Behaviour = Behaviour::new(MessageAuthenticity::RandomAuthor, config).unwrap(); let mut length_codec = unsigned_varint::codec::UviBytes::default(); length_codec.set_max_len(max_transmit_size); diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index e3287aef0cb..18e08e0ebb4 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -26,8 +26,7 @@ use crate::subscription_filter::WhitelistSubscriptionFilter; use crate::transform::{DataTransform, IdentityTransform}; use crate::types::FastMessageId; use crate::{ - GossipsubConfig, GossipsubConfigBuilder, GossipsubMessage, IdentTopic as Topic, - TopicScoreParams, + config::Config, config::ConfigBuilder, IdentTopic as Topic, Message, TopicScoreParams, }; use async_std::net::Ipv4Addr; use byteorder::{BigEndian, ByteOrder}; @@ -49,7 +48,7 @@ where peer_no: usize, topics: Vec, to_subscribe: bool, - gs_config: GossipsubConfig, + gs_config: Config, explicit: usize, outbound: usize, scoring: Option<(PeerScoreParams, PeerScoreThresholds)>, @@ -62,10 +61,10 @@ where D: DataTransform + Default + Clone + Send + 'static, F: TopicSubscriptionFilter + Clone + Default + Send + 'static, { - pub fn create_network(self) -> (Gossipsub, Vec, Vec) { + pub fn create_network(self) -> (Behaviour, Vec, Vec) { let keypair = libp2p_core::identity::Keypair::generate_ed25519(); // create a gossipsub struct - let mut gs: Gossipsub = Gossipsub::new_with_subscription_filter_and_transform( + let mut gs: Behaviour = Behaviour::new_with_subscription_filter_and_transform( MessageAuthenticity::Signed(keypair), self.gs_config, None, @@ -124,7 +123,7 @@ where self } - fn gs_config(mut self, gs_config: GossipsubConfig) -> Self { + fn gs_config(mut self, gs_config: Config) -> Self { self.gs_config = gs_config; self } @@ -165,7 +164,7 @@ fn inject_nodes1() -> InjectNodes // helper functions for testing fn add_peer( - gs: &mut Gossipsub, + gs: &mut Behaviour, topic_hashes: &Vec, outbound: bool, explicit: bool, @@ -178,7 +177,7 @@ where } fn add_peer_with_addr( - gs: &mut Gossipsub, + gs: &mut Behaviour, topic_hashes: &Vec, outbound: bool, explicit: bool, @@ -199,7 +198,7 @@ where } fn add_peer_with_addr_and_kind( - gs: &mut Gossipsub, + gs: &mut Behaviour, topic_hashes: &Vec, outbound: bool, explicit: bool, @@ -243,8 +242,8 @@ where &topic_hashes .iter() .cloned() - .map(|t| GossipsubSubscription { - action: GossipsubSubscriptionAction::Subscribe, + .map(|t| Subscription { + action: SubscriptionAction::Subscribe, topic_hash: t, }) .collect::>(), @@ -254,7 +253,7 @@ where peer } -fn disconnect_peer(gs: &mut Gossipsub, peer_id: &PeerId) +fn disconnect_peer(gs: &mut Behaviour, peer_id: &PeerId) where D: DataTransform + Default + Clone + Send + 'static, F: TopicSubscriptionFilter + Clone + Default + Send + 'static, @@ -270,10 +269,8 @@ where for connection_id in peer_connections.connections.clone() { active_connections = active_connections.checked_sub(1).unwrap(); - let dummy_handler = GossipsubHandler::new( - ProtocolConfig::new(&GossipsubConfig::default()), - Duration::ZERO, - ); + let dummy_handler = + Handler::new(ProtocolConfig::new(&Config::default()), Duration::ZERO); gs.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed { peer_id: *peer_id, @@ -287,12 +284,12 @@ where } // Converts a protobuf message into a gossipsub message for reading the Gossipsub event queue. -fn proto_to_message(rpc: &crate::rpc_proto::Rpc) -> GossipsubRpc { +fn proto_to_message(rpc: &crate::rpc_proto::Rpc) -> Rpc { // Store valid messages. let mut messages = Vec::with_capacity(rpc.publish.len()); let rpc = rpc.clone(); for message in rpc.publish.into_iter() { - messages.push(RawGossipsubMessage { + messages.push(RawMessage { source: message.from.map(|x| PeerId::from_bytes(&x).unwrap()), data: message.data.unwrap_or_default(), sequence_number: message.seqno.map(|x| BigEndian::read_u64(&x)), // don't inform the application @@ -305,10 +302,10 @@ fn proto_to_message(rpc: &crate::rpc_proto::Rpc) -> GossipsubRpc { let mut control_msgs = Vec::new(); if let Some(rpc_control) = rpc.control { // Collect the gossipsub control messages - let ihave_msgs: Vec = rpc_control + let ihave_msgs: Vec = rpc_control .ihave .into_iter() - .map(|ihave| GossipsubControlAction::IHave { + .map(|ihave| ControlAction::IHave { topic_hash: TopicHash::from_raw(ihave.topic_id.unwrap_or_default()), message_ids: ihave .message_ids @@ -318,10 +315,10 @@ fn proto_to_message(rpc: &crate::rpc_proto::Rpc) -> GossipsubRpc { }) .collect(); - let iwant_msgs: Vec = rpc_control + let iwant_msgs: Vec = rpc_control .iwant .into_iter() - .map(|iwant| GossipsubControlAction::IWant { + .map(|iwant| ControlAction::IWant { message_ids: iwant .message_ids .into_iter() @@ -330,10 +327,10 @@ fn proto_to_message(rpc: &crate::rpc_proto::Rpc) -> GossipsubRpc { }) .collect(); - let graft_msgs: Vec = rpc_control + let graft_msgs: Vec = rpc_control .graft .into_iter() - .map(|graft| GossipsubControlAction::Graft { + .map(|graft| ControlAction::Graft { topic_hash: TopicHash::from_raw(graft.topic_id.unwrap_or_default()), }) .collect(); @@ -357,7 +354,7 @@ fn proto_to_message(rpc: &crate::rpc_proto::Rpc) -> GossipsubRpc { .collect::>(); let topic_hash = TopicHash::from_raw(prune.topic_id.unwrap_or_default()); - prune_msgs.push(GossipsubControlAction::Prune { + prune_msgs.push(ControlAction::Prune { topic_hash, peers, backoff: prune.backoff, @@ -370,16 +367,16 @@ fn proto_to_message(rpc: &crate::rpc_proto::Rpc) -> GossipsubRpc { control_msgs.extend(prune_msgs); } - GossipsubRpc { + Rpc { messages, subscriptions: rpc .subscriptions .into_iter() - .map(|sub| GossipsubSubscription { + .map(|sub| Subscription { action: if Some(true) == sub.subscribe { - GossipsubSubscriptionAction::Subscribe + SubscriptionAction::Subscribe } else { - GossipsubSubscriptionAction::Unsubscribe + SubscriptionAction::Unsubscribe }, topic_hash: TopicHash::from_raw(sub.topic_id.unwrap_or_default()), }) @@ -414,7 +411,7 @@ fn test_subscribe() { .iter() .fold(vec![], |mut collected_subscriptions, e| match e { NetworkBehaviourAction::NotifyHandler { - event: GossipsubHandlerIn::Message(ref message), + event: HandlerIn::Message(ref message), .. } => { for s in &message.subscriptions { @@ -482,7 +479,7 @@ fn test_unsubscribe() { .iter() .fold(vec![], |mut collected_subscriptions, e| match e { NetworkBehaviourAction::NotifyHandler { - event: GossipsubHandlerIn::Message(ref message), + event: HandlerIn::Message(ref message), .. } => { for s in &message.subscriptions { @@ -557,11 +554,11 @@ fn test_join() { ); fn collect_grafts( - mut collected_grafts: Vec, - (_, controls): (&PeerId, &Vec), - ) -> Vec { + mut collected_grafts: Vec, + (_, controls): (&PeerId, &Vec), + ) -> Vec { for c in controls.iter() { - if let GossipsubControlAction::Graft { topic_hash: _ } = c { + if let ControlAction::Graft { topic_hash: _ } = c { collected_grafts.push(c.clone()) } } @@ -635,7 +632,7 @@ fn test_publish_without_flood_publishing() { // - Insert message into gs.mcache and gs.received //turn off flood publish to test old behaviour - let config = GossipsubConfigBuilder::default() + let config = ConfigBuilder::default() .flood_publish(false) .build() .unwrap(); @@ -670,7 +667,7 @@ fn test_publish_without_flood_publishing() { .iter() .fold(vec![], |mut collected_publish, e| match e { NetworkBehaviourAction::NotifyHandler { - event: GossipsubHandlerIn::Message(ref message), + event: HandlerIn::Message(ref message), .. } => { let event = proto_to_message(message); @@ -695,7 +692,7 @@ fn test_publish_without_flood_publishing() { let msg_id = gs.config.message_id(message); - let config: GossipsubConfig = GossipsubConfig::default(); + let config: Config = Config::default(); assert_eq!( publishes.len(), config.mesh_n_low(), @@ -717,7 +714,7 @@ fn test_fanout() { // - Insert message into gs.mcache and gs.received //turn off flood publish to test fanout behaviour - let config = GossipsubConfigBuilder::default() + let config = ConfigBuilder::default() .flood_publish(false) .build() .unwrap(); @@ -760,7 +757,7 @@ fn test_fanout() { .iter() .fold(vec![], |mut collected_publish, e| match e { NetworkBehaviourAction::NotifyHandler { - event: GossipsubHandlerIn::Message(ref message), + event: HandlerIn::Message(ref message), .. } => { let event = proto_to_message(message); @@ -813,7 +810,7 @@ fn test_inject_connected() { .iter() .filter(|e| match e { NetworkBehaviourAction::NotifyHandler { - event: GossipsubHandlerIn::Message(ref m), + event: HandlerIn::Message(ref m), .. } => !m.subscriptions.is_empty(), _ => false, @@ -823,7 +820,7 @@ fn test_inject_connected() { // check that there are two subscriptions sent to each peer for sevent in send_events.clone() { if let NetworkBehaviourAction::NotifyHandler { - event: GossipsubHandlerIn::Message(ref m), + event: HandlerIn::Message(ref m), .. } = sevent { @@ -872,14 +869,14 @@ fn test_handle_received_subscriptions() { // The first peer sends 3 subscriptions and 1 unsubscription let mut subscriptions = topic_hashes[..3] .iter() - .map(|topic_hash| GossipsubSubscription { - action: GossipsubSubscriptionAction::Subscribe, + .map(|topic_hash| Subscription { + action: SubscriptionAction::Subscribe, topic_hash: topic_hash.clone(), }) - .collect::>(); + .collect::>(); - subscriptions.push(GossipsubSubscription { - action: GossipsubSubscriptionAction::Unsubscribe, + subscriptions.push(Subscription { + action: SubscriptionAction::Unsubscribe, topic_hash: topic_hashes[topic_hashes.len() - 1].clone(), }); @@ -920,8 +917,8 @@ fn test_handle_received_subscriptions() { // Peer 0 unsubscribes from the first topic gs.handle_received_subscriptions( - &[GossipsubSubscription { - action: GossipsubSubscriptionAction::Unsubscribe, + &[Subscription { + action: SubscriptionAction::Unsubscribe, topic_hash: topic_hashes[0].clone(), }], &peers[0], @@ -943,13 +940,13 @@ fn test_handle_received_subscriptions() { #[test] /// Test Gossipsub.get_random_peers() function fn test_get_random_peers() { - // generate a default GossipsubConfig - let gs_config = GossipsubConfigBuilder::default() + // generate a default Config + let gs_config = ConfigBuilder::default() .validation_mode(ValidationMode::Anonymous) .build() .unwrap(); // create a gossipsub struct - let mut gs: Gossipsub = Gossipsub::new(MessageAuthenticity::Anonymous, gs_config).unwrap(); + let mut gs: Behaviour = Behaviour::new(MessageAuthenticity::Anonymous, gs_config).unwrap(); // create a topic and fill it with some peers let topic_hash = Topic::new("Test").hash(); @@ -1032,7 +1029,7 @@ fn test_handle_iwant_msg_cached() { .to_subscribe(true) .create_network(); - let raw_message = RawGossipsubMessage { + let raw_message = RawMessage { source: Some(peers[11]), data: vec![1, 2, 3, 4], sequence_number: Some(1u64), @@ -1059,7 +1056,7 @@ fn test_handle_iwant_msg_cached() { .iter() .fold(vec![], |mut collected_messages, e| match e { NetworkBehaviourAction::NotifyHandler { event, .. } => { - if let GossipsubHandlerIn::Message(ref m) = event { + if let HandlerIn::Message(ref m) = event { let event = proto_to_message(m); for c in &event.messages { collected_messages.push(c.clone()) @@ -1090,7 +1087,7 @@ fn test_handle_iwant_msg_cached_shifted() { // perform 10 memshifts and check that it leaves the cache for shift in 1..10 { - let raw_message = RawGossipsubMessage { + let raw_message = RawMessage { source: Some(peers[11]), data: vec![1, 2, 3, 4], sequence_number: Some(shift), @@ -1117,7 +1114,7 @@ fn test_handle_iwant_msg_cached_shifted() { // is the message is being sent? let message_exists = gs.events.iter().any(|e| match e { NetworkBehaviourAction::NotifyHandler { - event: GossipsubHandlerIn::Message(ref m), + event: HandlerIn::Message(ref m), .. } => { let event = proto_to_message(m); @@ -1180,7 +1177,7 @@ fn test_handle_ihave_subscribed_and_msg_not_cached() { // check that we sent an IWANT request for `unknown id` let iwant_exists = match gs.control_pool.get(&peers[7]) { Some(controls) => controls.iter().any(|c| match c { - GossipsubControlAction::IWant { message_ids } => message_ids + ControlAction::IWant { message_ids } => message_ids .iter() .any(|m| *m == MessageId::new(b"unknown id")), _ => false, @@ -1347,8 +1344,8 @@ fn test_handle_prune_peer_in_mesh() { } fn count_control_msgs( - gs: &Gossipsub, - mut filter: impl FnMut(&PeerId, &GossipsubControlAction) -> bool, + gs: &Behaviour, + mut filter: impl FnMut(&PeerId, &ControlAction) -> bool, ) -> usize { gs.control_pool .iter() @@ -1359,7 +1356,7 @@ fn count_control_msgs( .map(|e| match e { NetworkBehaviourAction::NotifyHandler { peer_id, - event: GossipsubHandlerIn::Message(ref m), + event: HandlerIn::Message(ref m), .. } => { let event = proto_to_message(m); @@ -1374,7 +1371,7 @@ fn count_control_msgs( .sum::() } -fn flush_events(gs: &mut Gossipsub) { +fn flush_events(gs: &mut Behaviour) { gs.control_pool.clear(); gs.events.clear(); } @@ -1411,7 +1408,7 @@ fn test_explicit_peer_gets_connected() { #[test] fn test_explicit_peer_reconnects() { - let config = GossipsubConfigBuilder::default() + let config = ConfigBuilder::default() .check_explicit_peers_ticks(2) .build() .unwrap(); @@ -1471,7 +1468,7 @@ fn test_handle_graft_explicit_peer() { .peer_no(1) .topics(vec![String::from("topic1"), String::from("topic2")]) .to_subscribe(true) - .gs_config(GossipsubConfig::default()) + .gs_config(Config::default()) .explicit(1) .create_network(); @@ -1487,7 +1484,7 @@ fn test_handle_graft_explicit_peer() { assert!( count_control_msgs(&gs, |peer_id, m| peer_id == peer && match m { - GossipsubControlAction::Prune { topic_hash, .. } => + ControlAction::Prune { topic_hash, .. } => topic_hash == &topic_hashes[0] || topic_hash == &topic_hashes[1], _ => false, }) @@ -1502,7 +1499,7 @@ fn explicit_peers_not_added_to_mesh_on_receiving_subscription() { .peer_no(2) .topics(vec![String::from("topic1")]) .to_subscribe(true) - .gs_config(GossipsubConfig::default()) + .gs_config(Config::default()) .explicit(1) .create_network(); @@ -1515,7 +1512,7 @@ fn explicit_peers_not_added_to_mesh_on_receiving_subscription() { //assert that graft gets created to non-explicit peer assert!( count_control_msgs(&gs, |peer_id, m| peer_id == &peers[1] - && matches!(m, GossipsubControlAction::Graft { .. })) + && matches!(m, ControlAction::Graft { .. })) >= 1, "No graft message got created to non-explicit peer" ); @@ -1523,7 +1520,7 @@ fn explicit_peers_not_added_to_mesh_on_receiving_subscription() { //assert that no graft gets created to explicit peer assert_eq!( count_control_msgs(&gs, |peer_id, m| peer_id == &peers[0] - && matches!(m, GossipsubControlAction::Graft { .. })), + && matches!(m, ControlAction::Graft { .. })), 0, "A graft message got created to an explicit peer" ); @@ -1535,7 +1532,7 @@ fn do_not_graft_explicit_peer() { .peer_no(1) .topics(vec![String::from("topic")]) .to_subscribe(true) - .gs_config(GossipsubConfig::default()) + .gs_config(Config::default()) .explicit(1) .create_network(); @@ -1547,7 +1544,7 @@ fn do_not_graft_explicit_peer() { //assert that no graft gets created to explicit peer assert_eq!( count_control_msgs(&gs, |peer_id, m| peer_id == &others[0] - && matches!(m, GossipsubControlAction::Graft { .. })), + && matches!(m, ControlAction::Graft { .. })), 0, "A graft message got created to an explicit peer" ); @@ -1559,13 +1556,13 @@ fn do_forward_messages_to_explicit_peers() { .peer_no(2) .topics(vec![String::from("topic1"), String::from("topic2")]) .to_subscribe(true) - .gs_config(GossipsubConfig::default()) + .gs_config(Config::default()) .explicit(1) .create_network(); let local_id = PeerId::random(); - let message = RawGossipsubMessage { + let message = RawMessage { source: Some(peers[1]), data: vec![12], sequence_number: Some(0), @@ -1582,7 +1579,7 @@ fn do_forward_messages_to_explicit_peers() { .filter(|e| match e { NetworkBehaviourAction::NotifyHandler { peer_id, - event: GossipsubHandlerIn::Message(ref m), + event: HandlerIn::Message(ref m), .. } => { let event = proto_to_message(m); @@ -1608,7 +1605,7 @@ fn explicit_peers_not_added_to_mesh_on_subscribe() { .peer_no(2) .topics(Vec::new()) .to_subscribe(true) - .gs_config(GossipsubConfig::default()) + .gs_config(Config::default()) .explicit(1) .create_network(); @@ -1617,8 +1614,8 @@ fn explicit_peers_not_added_to_mesh_on_subscribe() { let topic_hash = topic.hash(); for peer in peers.iter().take(2) { gs.handle_received_subscriptions( - &[GossipsubSubscription { - action: GossipsubSubscriptionAction::Subscribe, + &[Subscription { + action: SubscriptionAction::Subscribe, topic_hash: topic_hash.clone(), }], peer, @@ -1634,7 +1631,7 @@ fn explicit_peers_not_added_to_mesh_on_subscribe() { //assert that graft gets created to non-explicit peer assert!( count_control_msgs(&gs, |peer_id, m| peer_id == &peers[1] - && matches!(m, GossipsubControlAction::Graft { .. })) + && matches!(m, ControlAction::Graft { .. })) > 0, "No graft message got created to non-explicit peer" ); @@ -1642,7 +1639,7 @@ fn explicit_peers_not_added_to_mesh_on_subscribe() { //assert that no graft gets created to explicit peer assert_eq!( count_control_msgs(&gs, |peer_id, m| peer_id == &peers[0] - && matches!(m, GossipsubControlAction::Graft { .. })), + && matches!(m, ControlAction::Graft { .. })), 0, "A graft message got created to an explicit peer" ); @@ -1654,7 +1651,7 @@ fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { .peer_no(2) .topics(Vec::new()) .to_subscribe(true) - .gs_config(GossipsubConfig::default()) + .gs_config(Config::default()) .explicit(1) .create_network(); @@ -1663,8 +1660,8 @@ fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { let topic_hash = topic.hash(); for peer in peers.iter().take(2) { gs.handle_received_subscriptions( - &[GossipsubSubscription { - action: GossipsubSubscriptionAction::Subscribe, + &[Subscription { + action: SubscriptionAction::Subscribe, topic_hash: topic_hash.clone(), }], peer, @@ -1683,7 +1680,7 @@ fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { //assert that graft gets created to non-explicit peer assert!( count_control_msgs(&gs, |peer_id, m| peer_id == &peers[1] - && matches!(m, GossipsubControlAction::Graft { .. })) + && matches!(m, ControlAction::Graft { .. })) >= 1, "No graft message got created to non-explicit peer" ); @@ -1691,7 +1688,7 @@ fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { //assert that no graft gets created to explicit peer assert_eq!( count_control_msgs(&gs, |peer_id, m| peer_id == &peers[0] - && matches!(m, GossipsubControlAction::Graft { .. })), + && matches!(m, ControlAction::Graft { .. })), 0, "A graft message got created to an explicit peer" ); @@ -1703,13 +1700,13 @@ fn no_gossip_gets_sent_to_explicit_peers() { .peer_no(2) .topics(vec![String::from("topic1"), String::from("topic2")]) .to_subscribe(true) - .gs_config(GossipsubConfig::default()) + .gs_config(Config::default()) .explicit(1) .create_network(); let local_id = PeerId::random(); - let message = RawGossipsubMessage { + let message = RawMessage { source: Some(peers[1]), data: vec![], sequence_number: Some(0), @@ -1733,7 +1730,7 @@ fn no_gossip_gets_sent_to_explicit_peers() { .get(&peers[0]) .unwrap_or(&Vec::new()) .iter() - .filter(|m| matches!(m, GossipsubControlAction::IHave { .. })) + .filter(|m| matches!(m, ControlAction::IHave { .. })) .count(), 0, "Gossip got emitted to explicit peer" @@ -1743,7 +1740,7 @@ fn no_gossip_gets_sent_to_explicit_peers() { // Tests the mesh maintenance addition #[test] fn test_mesh_addition() { - let config: GossipsubConfig = GossipsubConfig::default(); + let config: Config = Config::default(); // Adds mesh_low peers and PRUNE 2 giving us a deficit. let (mut gs, peers, topics) = inject_nodes1() @@ -1777,7 +1774,7 @@ fn test_mesh_addition() { // Tests the mesh maintenance subtraction #[test] fn test_mesh_subtraction() { - let config = GossipsubConfig::default(); + let config = Config::default(); // Adds mesh_low peers and PRUNE 2 giving us a deficit. let n = config.mesh_n_high() + 10; @@ -1804,7 +1801,7 @@ fn test_mesh_subtraction() { #[test] fn test_connect_to_px_peers_on_handle_prune() { - let config: GossipsubConfig = GossipsubConfig::default(); + let config: Config = Config::default(); let (mut gs, peers, topics) = inject_nodes1() .peer_no(1) @@ -1859,7 +1856,7 @@ fn test_connect_to_px_peers_on_handle_prune() { #[test] fn test_send_px_and_backoff_in_prune() { - let config: GossipsubConfig = GossipsubConfig::default(); + let config: Config = Config::default(); //build mesh with enough peers for px let (mut gs, peers, topics) = inject_nodes1() @@ -1881,7 +1878,7 @@ fn test_send_px_and_backoff_in_prune() { assert_eq!( count_control_msgs(&gs, |peer_id, m| peer_id == &peers[0] && match m { - GossipsubControlAction::Prune { + ControlAction::Prune { topic_hash, peers, backoff, @@ -1900,7 +1897,7 @@ fn test_send_px_and_backoff_in_prune() { #[test] fn test_prune_backoffed_peer_on_graft() { - let config: GossipsubConfig = GossipsubConfig::default(); + let config: Config = Config::default(); //build mesh with enough peers for px let (mut gs, peers, topics) = inject_nodes1() @@ -1929,7 +1926,7 @@ fn test_prune_backoffed_peer_on_graft() { assert_eq!( count_control_msgs(&gs, |peer_id, m| peer_id == &peers[0] && match m { - GossipsubControlAction::Prune { + ControlAction::Prune { topic_hash, peers, backoff, @@ -1946,7 +1943,7 @@ fn test_prune_backoffed_peer_on_graft() { #[test] fn test_do_not_graft_within_backoff_period() { - let config = GossipsubConfigBuilder::default() + let config = ConfigBuilder::default() .backoff_slack(1) .heartbeat_interval(Duration::from_millis(100)) .build() @@ -1977,10 +1974,7 @@ fn test_do_not_graft_within_backoff_period() { //Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). assert_eq!( - count_control_msgs(&gs, |_, m| matches!( - m, - GossipsubControlAction::Graft { .. } - )), + count_control_msgs(&gs, |_, m| matches!(m, ControlAction::Graft { .. })), 0, "Graft message created too early within backoff period" ); @@ -1991,10 +1985,7 @@ fn test_do_not_graft_within_backoff_period() { //check that graft got created assert!( - count_control_msgs(&gs, |_, m| matches!( - m, - GossipsubControlAction::Graft { .. } - )) > 0, + count_control_msgs(&gs, |_, m| matches!(m, ControlAction::Graft { .. })) > 0, "No graft message was created after backoff period" ); } @@ -2002,7 +1993,7 @@ fn test_do_not_graft_within_backoff_period() { #[test] fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without_backoff() { //set default backoff period to 1 second - let config = GossipsubConfigBuilder::default() + let config = ConfigBuilder::default() .prune_backoff(Duration::from_millis(90)) .backoff_slack(1) .heartbeat_interval(Duration::from_millis(100)) @@ -2032,10 +2023,7 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without //Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). assert_eq!( - count_control_msgs(&gs, |_, m| matches!( - m, - GossipsubControlAction::Graft { .. } - )), + count_control_msgs(&gs, |_, m| matches!(m, ControlAction::Graft { .. })), 0, "Graft message created too early within backoff period" ); @@ -2046,10 +2034,7 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without //check that graft got created assert!( - count_control_msgs(&gs, |_, m| matches!( - m, - GossipsubControlAction::Graft { .. } - )) > 0, + count_control_msgs(&gs, |_, m| matches!(m, ControlAction::Graft { .. })) > 0, "No graft message was created after backoff period" ); } @@ -2057,7 +2042,7 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without #[test] fn test_unsubscribe_backoff() { const HEARTBEAT_INTERVAL: Duration = Duration::from_millis(100); - let config = GossipsubConfigBuilder::default() + let config = ConfigBuilder::default() .backoff_slack(1) // ensure a prune_backoff > unsubscribe_backoff .prune_backoff(Duration::from_secs(5)) @@ -2079,7 +2064,7 @@ fn test_unsubscribe_backoff() { assert_eq!( count_control_msgs(&gs, |_, m| match m { - GossipsubControlAction::Prune { backoff, .. } => backoff == &Some(1), + ControlAction::Prune { backoff, .. } => backoff == &Some(1), _ => false, }), 1, @@ -2103,10 +2088,7 @@ fn test_unsubscribe_backoff() { // Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). assert_eq!( - count_control_msgs(&gs, |_, m| matches!( - m, - GossipsubControlAction::Graft { .. } - )), + count_control_msgs(&gs, |_, m| matches!(m, ControlAction::Graft { .. })), 0, "Graft message created too early within backoff period" ); @@ -2117,17 +2099,14 @@ fn test_unsubscribe_backoff() { // check that graft got created assert!( - count_control_msgs(&gs, |_, m| matches!( - m, - GossipsubControlAction::Graft { .. } - )) > 0, + count_control_msgs(&gs, |_, m| matches!(m, ControlAction::Graft { .. })) > 0, "No graft message was created after backoff period" ); } #[test] fn test_flood_publish() { - let config: GossipsubConfig = GossipsubConfig::default(); + let config: Config = Config::default(); let topic = "test"; // Adds more peers than mesh can hold to test flood publishing @@ -2147,7 +2126,7 @@ fn test_flood_publish() { .iter() .fold(vec![], |mut collected_publish, e| match e { NetworkBehaviourAction::NotifyHandler { event, .. } => { - if let GossipsubHandlerIn::Message(ref m) = event { + if let HandlerIn::Message(ref m) = event { let event = proto_to_message(m); for s in &event.messages { collected_publish.push(s.clone()); @@ -2171,7 +2150,7 @@ fn test_flood_publish() { let msg_id = gs.config.message_id(message); - let config: GossipsubConfig = GossipsubConfig::default(); + let config: Config = Config::default(); assert_eq!( publishes.len(), config.mesh_n_high() + 10, @@ -2186,7 +2165,7 @@ fn test_flood_publish() { #[test] fn test_gossip_to_at_least_gossip_lazy_peers() { - let config: GossipsubConfig = GossipsubConfig::default(); + let config: Config = Config::default(); //add more peers than in mesh to test gossipping //by default only mesh_n_low peers will get added to mesh @@ -2197,7 +2176,7 @@ fn test_gossip_to_at_least_gossip_lazy_peers() { .create_network(); //receive message - let raw_message = RawGossipsubMessage { + let raw_message = RawMessage { source: Some(PeerId::random()), data: vec![], sequence_number: Some(0), @@ -2219,7 +2198,7 @@ fn test_gossip_to_at_least_gossip_lazy_peers() { //check that exactly config.gossip_lazy() many gossip messages were sent. assert_eq!( count_control_msgs(&gs, |_, action| match action { - GossipsubControlAction::IHave { + ControlAction::IHave { topic_hash, message_ids, } => topic_hash == &topic_hashes[0] && message_ids.iter().any(|id| id == &msg_id), @@ -2231,7 +2210,7 @@ fn test_gossip_to_at_least_gossip_lazy_peers() { #[test] fn test_gossip_to_at_most_gossip_factor_peers() { - let config: GossipsubConfig = GossipsubConfig::default(); + let config: Config = Config::default(); //add a lot of peers let m = config.mesh_n_low() + config.gossip_lazy() * (2.0 / config.gossip_factor()) as usize; @@ -2242,7 +2221,7 @@ fn test_gossip_to_at_most_gossip_factor_peers() { .create_network(); //receive message - let raw_message = RawGossipsubMessage { + let raw_message = RawMessage { source: Some(PeerId::random()), data: vec![], sequence_number: Some(0), @@ -2263,7 +2242,7 @@ fn test_gossip_to_at_most_gossip_factor_peers() { //check that exactly config.gossip_lazy() many gossip messages were sent. assert_eq!( count_control_msgs(&gs, |_, action| match action { - GossipsubControlAction::IHave { + ControlAction::IHave { topic_hash, message_ids, } => topic_hash == &topic_hashes[0] && message_ids.iter().any(|id| id == &msg_id), @@ -2275,7 +2254,7 @@ fn test_gossip_to_at_most_gossip_factor_peers() { #[test] fn test_accept_only_outbound_peer_grafts_when_mesh_full() { - let config: GossipsubConfig = GossipsubConfig::default(); + let config: Config = Config::default(); //enough peers to fill the mesh let (mut gs, peers, topics) = inject_nodes1() @@ -2315,7 +2294,7 @@ fn test_do_not_remove_too_many_outbound_peers() { //use an extreme case to catch errors with high probability let m = 50; let n = 2 * m; - let config = GossipsubConfigBuilder::default() + let config = ConfigBuilder::default() .mesh_n_high(n) .mesh_n(n) .mesh_n_low(n) @@ -2359,7 +2338,7 @@ fn test_do_not_remove_too_many_outbound_peers() { #[test] fn test_add_outbound_peers_if_min_is_not_satisfied() { - let config: GossipsubConfig = GossipsubConfig::default(); + let config: Config = Config::default(); // Fill full mesh with inbound peers let (mut gs, peers, topics) = inject_nodes1() @@ -2393,7 +2372,7 @@ fn test_add_outbound_peers_if_min_is_not_satisfied() { #[test] fn test_prune_negative_scored_peers() { - let config = GossipsubConfig::default(); + let config = Config::default(); //build mesh with one peer let (mut gs, peers, topics) = inject_nodes1() @@ -2422,7 +2401,7 @@ fn test_prune_negative_scored_peers() { assert_eq!( count_control_msgs(&gs, |peer_id, m| peer_id == &peers[0] && match m { - GossipsubControlAction::Prune { + ControlAction::Prune { topic_hash, peers, backoff, @@ -2439,7 +2418,7 @@ fn test_prune_negative_scored_peers() { #[test] fn test_dont_graft_to_negative_scored_peers() { - let config = GossipsubConfig::default(); + let config = Config::default(); //init full mesh let (mut gs, peers, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) @@ -2477,7 +2456,7 @@ fn test_dont_graft_to_negative_scored_peers() { /// peers should get ignored, therefore we test it here. #[test] fn test_ignore_px_from_negative_scored_peer() { - let config = GossipsubConfig::default(); + let config = Config::default(); //build mesh with one peer let (mut gs, peers, topics) = inject_nodes1() @@ -2520,7 +2499,7 @@ fn test_ignore_px_from_negative_scored_peer() { #[test] fn test_only_send_nonnegative_scoring_peers_in_px() { - let config = GossipsubConfigBuilder::default() + let config = ConfigBuilder::default() .prune_peers(16) .do_px() .build() @@ -2556,7 +2535,7 @@ fn test_only_send_nonnegative_scoring_peers_in_px() { assert_eq!( count_control_msgs(&gs, |peer_id, m| peer_id == &peers[1] && match m { - GossipsubControlAction::Prune { + ControlAction::Prune { topic_hash, peers: px, .. @@ -2572,7 +2551,7 @@ fn test_only_send_nonnegative_scoring_peers_in_px() { #[test] fn test_do_not_gossip_to_peers_below_gossip_threshold() { - let config = GossipsubConfig::default(); + let config = Config::default(); let peer_score_params = PeerScoreParams::default(); let peer_score_thresholds = PeerScoreThresholds { gossip_threshold: 3.0 * peer_score_params.behaviour_penalty_weight, @@ -2606,7 +2585,7 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() { gs.peer_score.as_mut().unwrap().0.add_penalty(&p2, 1); // Receive message - let raw_message = RawGossipsubMessage { + let raw_message = RawMessage { source: Some(PeerId::random()), data: vec![], sequence_number: Some(0), @@ -2628,7 +2607,7 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() { // Check that exactly one gossip messages got sent and it got sent to p2 assert_eq!( count_control_msgs(&gs, |peer, action| match action { - GossipsubControlAction::IHave { + ControlAction::IHave { topic_hash, message_ids, } => { @@ -2647,7 +2626,7 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() { #[test] fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() { - let config = GossipsubConfig::default(); + let config = Config::default(); let peer_score_params = PeerScoreParams::default(); let peer_score_thresholds = PeerScoreThresholds { gossip_threshold: 3.0 * peer_score_params.behaviour_penalty_weight, @@ -2683,7 +2662,7 @@ fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() { gs.peer_score.as_mut().unwrap().0.add_penalty(&p2, 1); // Receive message - let raw_message = RawGossipsubMessage { + let raw_message = RawMessage { source: Some(PeerId::random()), data: vec![], sequence_number: Some(0), @@ -2708,7 +2687,7 @@ fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() { .iter() .fold(vec![], |mut collected_messages, e| match e { NetworkBehaviourAction::NotifyHandler { event, peer_id, .. } => { - if let GossipsubHandlerIn::Message(ref m) = event { + if let HandlerIn::Message(ref m) = event { let event = proto_to_message(m); for c in &event.messages { collected_messages.push((*peer_id, c.clone())) @@ -2739,7 +2718,7 @@ fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() { #[test] fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { - let config = GossipsubConfig::default(); + let config = Config::default(); let peer_score_params = PeerScoreParams::default(); let peer_score_thresholds = PeerScoreThresholds { gossip_threshold: 3.0 * peer_score_params.behaviour_penalty_weight, @@ -2774,7 +2753,7 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { gs.peer_score.as_mut().unwrap().0.add_penalty(&p2, 1); //message that other peers have - let raw_message = RawGossipsubMessage { + let raw_message = RawMessage { source: Some(PeerId::random()), data: vec![], sequence_number: Some(0), @@ -2795,7 +2774,7 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { // check that we sent exactly one IWANT request to p2 assert_eq!( count_control_msgs(&gs, |peer, c| match c { - GossipsubControlAction::IWant { message_ids } => + ControlAction::IWant { message_ids } => if message_ids.iter().any(|m| m == &msg_id) { assert_eq!(peer, &p2); true @@ -2810,7 +2789,7 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { #[test] fn test_do_not_publish_to_peer_below_publish_threshold() { - let config = GossipsubConfigBuilder::default() + let config = ConfigBuilder::default() .flood_publish(false) .build() .unwrap(); @@ -2856,7 +2835,7 @@ fn test_do_not_publish_to_peer_below_publish_threshold() { .iter() .fold(vec![], |mut collected_publish, e| match e { NetworkBehaviourAction::NotifyHandler { event, peer_id, .. } => { - if let GossipsubHandlerIn::Message(ref m) = event { + if let HandlerIn::Message(ref m) = event { let event = proto_to_message(m); for s in &event.messages { collected_publish.push((*peer_id, s.clone())); @@ -2874,7 +2853,7 @@ fn test_do_not_publish_to_peer_below_publish_threshold() { #[test] fn test_do_not_flood_publish_to_peer_below_publish_threshold() { - let config = GossipsubConfig::default(); + let config = Config::default(); let peer_score_params = PeerScoreParams::default(); let peer_score_thresholds = PeerScoreThresholds { gossip_threshold: 0.5 * peer_score_params.behaviour_penalty_weight, @@ -2913,7 +2892,7 @@ fn test_do_not_flood_publish_to_peer_below_publish_threshold() { .iter() .fold(vec![], |mut collected_publish, e| match e { NetworkBehaviourAction::NotifyHandler { event, peer_id, .. } => { - if let GossipsubHandlerIn::Message(ref m) = event { + if let HandlerIn::Message(ref m) = event { let event = proto_to_message(m); for s in &event.messages { collected_publish.push((*peer_id, s.clone())); @@ -2931,7 +2910,7 @@ fn test_do_not_flood_publish_to_peer_below_publish_threshold() { #[test] fn test_ignore_rpc_from_peers_below_graylist_threshold() { - let config = GossipsubConfig::default(); + let config = Config::default(); let peer_score_params = PeerScoreParams::default(); let peer_score_thresholds = PeerScoreThresholds { gossip_threshold: 0.5 * peer_score_params.behaviour_penalty_weight, @@ -2959,7 +2938,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() { //reduce score of p2 below publish_threshold but not below graylist_threshold gs.peer_score.as_mut().unwrap().0.add_penalty(&p2, 1); - let raw_message1 = RawGossipsubMessage { + let raw_message1 = RawMessage { source: Some(PeerId::random()), data: vec![1, 2, 3, 4], sequence_number: Some(1u64), @@ -2969,7 +2948,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() { validated: true, }; - let raw_message2 = RawGossipsubMessage { + let raw_message2 = RawMessage { source: Some(PeerId::random()), data: vec![1, 2, 3, 4, 5], sequence_number: Some(2u64), @@ -2979,7 +2958,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() { validated: true, }; - let raw_message3 = RawGossipsubMessage { + let raw_message3 = RawMessage { source: Some(PeerId::random()), data: vec![1, 2, 3, 4, 5, 6], sequence_number: Some(3u64), @@ -2989,7 +2968,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() { validated: true, }; - let raw_message4 = RawGossipsubMessage { + let raw_message4 = RawMessage { source: Some(PeerId::random()), data: vec![1, 2, 3, 4, 5, 6, 7], sequence_number: Some(4u64), @@ -3005,12 +2984,12 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() { // Transform the inbound message let message4 = &gs.data_transform.inbound_transform(raw_message4).unwrap(); - let subscription = GossipsubSubscription { - action: GossipsubSubscriptionAction::Subscribe, + let subscription = Subscription { + action: SubscriptionAction::Subscribe, topic_hash: topics[0].clone(), }; - let control_action = GossipsubControlAction::IHave { + let control_action = ControlAction::IHave { topic_hash: topics[0].clone(), message_ids: vec![config.message_id(message2)], }; @@ -3024,7 +3003,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() { #[allow(deprecated)] ConnectionId::DUMMY, HandlerEvent::Message { - rpc: GossipsubRpc { + rpc: Rpc { messages: vec![raw_message1], subscriptions: vec![subscription.clone()], control_msgs: vec![control_action], @@ -3037,10 +3016,10 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() { assert_eq!(gs.events.len(), 1); assert!(matches!( gs.events[0], - NetworkBehaviourAction::GenerateEvent(GossipsubEvent::Subscribed { .. }) + NetworkBehaviourAction::GenerateEvent(Event::Subscribed { .. }) )); - let control_action = GossipsubControlAction::IHave { + let control_action = ControlAction::IHave { topic_hash: topics[0].clone(), message_ids: vec![config.message_id(message4)], }; @@ -3051,7 +3030,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() { #[allow(deprecated)] ConnectionId::DUMMY, HandlerEvent::Message { - rpc: GossipsubRpc { + rpc: Rpc { messages: vec![raw_message3], subscriptions: vec![subscription], control_msgs: vec![control_action], @@ -3066,10 +3045,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() { #[test] fn test_ignore_px_from_peers_below_accept_px_threshold() { - let config = GossipsubConfigBuilder::default() - .prune_peers(16) - .build() - .unwrap(); + let config = ConfigBuilder::default().prune_peers(16).build().unwrap(); let peer_score_params = PeerScoreParams::default(); let peer_score_thresholds = PeerScoreThresholds { accept_px_threshold: peer_score_params.app_specific_weight, @@ -3137,7 +3113,7 @@ fn test_ignore_px_from_peers_below_accept_px_threshold() { #[test] fn test_keep_best_scoring_peers_on_oversubscription() { - let config = GossipsubConfigBuilder::default() + let config = ConfigBuilder::default() .mesh_n_low(15) .mesh_n(30) .mesh_n_high(60) @@ -3190,7 +3166,7 @@ fn test_keep_best_scoring_peers_on_oversubscription() { #[test] fn test_scoring_p1() { - let config = GossipsubConfig::default(); + let config = Config::default(); let mut peer_score_params = PeerScoreParams::default(); let topic = Topic::new("test"); let topic_hash = topic.hash(); @@ -3255,10 +3231,10 @@ fn test_scoring_p1() { ); } -fn random_message(seq: &mut u64, topics: &Vec) -> RawGossipsubMessage { +fn random_message(seq: &mut u64, topics: &Vec) -> RawMessage { let mut rng = rand::thread_rng(); *seq += 1; - RawGossipsubMessage { + RawMessage { source: Some(PeerId::random()), data: (0..rng.gen_range(10..30)) .into_iter() @@ -3274,7 +3250,7 @@ fn random_message(seq: &mut u64, topics: &Vec) -> RawGossipsubMessage #[test] fn test_scoring_p2() { - let config = GossipsubConfig::default(); + let config = Config::default(); let mut peer_score_params = PeerScoreParams::default(); let topic = Topic::new("test"); let topic_hash = topic.hash(); @@ -3303,7 +3279,7 @@ fn test_scoring_p2() { .create_network(); let mut seq = 0; - let deliver_message = |gs: &mut Gossipsub, index: usize, msg: RawGossipsubMessage| { + let deliver_message = |gs: &mut Behaviour, index: usize, msg: RawMessage| { gs.handle_received_message(msg, &peers[index]); }; @@ -3372,7 +3348,7 @@ fn test_scoring_p2() { #[test] fn test_scoring_p3() { - let config = GossipsubConfig::default(); + let config = Config::default(); let mut peer_score_params = PeerScoreParams::default(); let topic = Topic::new("test"); let topic_hash = topic.hash(); @@ -3403,7 +3379,7 @@ fn test_scoring_p3() { .create_network(); let mut seq = 0; - let deliver_message = |gs: &mut Gossipsub, index: usize, msg: RawGossipsubMessage| { + let deliver_message = |gs: &mut Behaviour, index: usize, msg: RawMessage| { gs.handle_received_message(msg, &peers[index]); }; @@ -3467,7 +3443,7 @@ fn test_scoring_p3() { #[test] fn test_scoring_p3b() { - let config = GossipsubConfigBuilder::default() + let config = ConfigBuilder::default() .prune_backoff(Duration::from_millis(100)) .build() .unwrap(); @@ -3504,7 +3480,7 @@ fn test_scoring_p3b() { .create_network(); let mut seq = 0; - let deliver_message = |gs: &mut Gossipsub, index: usize, msg: RawGossipsubMessage| { + let deliver_message = |gs: &mut Behaviour, index: usize, msg: RawMessage| { gs.handle_received_message(msg, &peers[index]); }; @@ -3563,7 +3539,7 @@ fn test_scoring_p3b() { #[test] fn test_scoring_p4_valid_message() { - let config = GossipsubConfigBuilder::default() + let config = ConfigBuilder::default() .validate_messages() .build() .unwrap(); @@ -3596,7 +3572,7 @@ fn test_scoring_p4_valid_message() { .create_network(); let mut seq = 0; - let deliver_message = |gs: &mut Gossipsub, index: usize, msg: RawGossipsubMessage| { + let deliver_message = |gs: &mut Behaviour, index: usize, msg: RawMessage| { gs.handle_received_message(msg, &peers[index]); }; @@ -3622,7 +3598,7 @@ fn test_scoring_p4_valid_message() { #[test] fn test_scoring_p4_invalid_signature() { - let config = GossipsubConfigBuilder::default() + let config = ConfigBuilder::default() .validate_messages() .build() .unwrap(); @@ -3664,7 +3640,7 @@ fn test_scoring_p4_invalid_signature() { #[allow(deprecated)] ConnectionId::DUMMY, HandlerEvent::Message { - rpc: GossipsubRpc { + rpc: Rpc { messages: vec![], subscriptions: vec![], control_msgs: vec![], @@ -3681,7 +3657,7 @@ fn test_scoring_p4_invalid_signature() { #[test] fn test_scoring_p4_message_from_self() { - let config = GossipsubConfigBuilder::default() + let config = ConfigBuilder::default() .validate_messages() .build() .unwrap(); @@ -3714,7 +3690,7 @@ fn test_scoring_p4_message_from_self() { .create_network(); let mut seq = 0; - let deliver_message = |gs: &mut Gossipsub, index: usize, msg: RawGossipsubMessage| { + let deliver_message = |gs: &mut Behaviour, index: usize, msg: RawMessage| { gs.handle_received_message(msg, &peers[index]); }; @@ -3731,7 +3707,7 @@ fn test_scoring_p4_message_from_self() { #[test] fn test_scoring_p4_ignored_message() { - let config = GossipsubConfigBuilder::default() + let config = ConfigBuilder::default() .validate_messages() .build() .unwrap(); @@ -3764,7 +3740,7 @@ fn test_scoring_p4_ignored_message() { .create_network(); let mut seq = 0; - let deliver_message = |gs: &mut Gossipsub, index: usize, msg: RawGossipsubMessage| { + let deliver_message = |gs: &mut Behaviour, index: usize, msg: RawMessage| { gs.handle_received_message(msg, &peers[index]); }; @@ -3790,7 +3766,7 @@ fn test_scoring_p4_ignored_message() { #[test] fn test_scoring_p4_application_invalidated_message() { - let config = GossipsubConfigBuilder::default() + let config = ConfigBuilder::default() .validate_messages() .build() .unwrap(); @@ -3823,7 +3799,7 @@ fn test_scoring_p4_application_invalidated_message() { .create_network(); let mut seq = 0; - let deliver_message = |gs: &mut Gossipsub, index: usize, msg: RawGossipsubMessage| { + let deliver_message = |gs: &mut Behaviour, index: usize, msg: RawMessage| { gs.handle_received_message(msg, &peers[index]); }; @@ -3852,7 +3828,7 @@ fn test_scoring_p4_application_invalidated_message() { #[test] fn test_scoring_p4_application_invalid_message_from_two_peers() { - let config = GossipsubConfigBuilder::default() + let config = ConfigBuilder::default() .validate_messages() .build() .unwrap(); @@ -3885,7 +3861,7 @@ fn test_scoring_p4_application_invalid_message_from_two_peers() { .create_network(); let mut seq = 0; - let deliver_message = |gs: &mut Gossipsub, index: usize, msg: RawGossipsubMessage| { + let deliver_message = |gs: &mut Behaviour, index: usize, msg: RawMessage| { gs.handle_received_message(msg, &peers[index]); }; @@ -3922,7 +3898,7 @@ fn test_scoring_p4_application_invalid_message_from_two_peers() { #[test] fn test_scoring_p4_three_application_invalid_messages() { - let config = GossipsubConfigBuilder::default() + let config = ConfigBuilder::default() .validate_messages() .build() .unwrap(); @@ -3955,7 +3931,7 @@ fn test_scoring_p4_three_application_invalid_messages() { .create_network(); let mut seq = 0; - let deliver_message = |gs: &mut Gossipsub, index: usize, msg: RawGossipsubMessage| { + let deliver_message = |gs: &mut Behaviour, index: usize, msg: RawMessage| { gs.handle_received_message(msg, &peers[index]); }; @@ -4006,7 +3982,7 @@ fn test_scoring_p4_three_application_invalid_messages() { #[test] fn test_scoring_p4_decay() { - let config = GossipsubConfigBuilder::default() + let config = ConfigBuilder::default() .validate_messages() .build() .unwrap(); @@ -4039,7 +4015,7 @@ fn test_scoring_p4_decay() { .create_network(); let mut seq = 0; - let deliver_message = |gs: &mut Gossipsub, index: usize, msg: RawGossipsubMessage| { + let deliver_message = |gs: &mut Behaviour, index: usize, msg: RawMessage| { gs.handle_received_message(msg, &peers[index]); }; @@ -4086,7 +4062,7 @@ fn test_scoring_p5() { .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) - .gs_config(GossipsubConfig::default()) + .gs_config(Config::default()) .explicit(0) .outbound(0) .scoring(Some((peer_score_params, PeerScoreThresholds::default()))) @@ -4112,7 +4088,7 @@ fn test_scoring_p6() { .peer_no(0) .topics(vec![]) .to_subscribe(false) - .gs_config(GossipsubConfig::default()) + .gs_config(Config::default()) .explicit(0) .outbound(0) .scoring(Some((peer_score_params, PeerScoreThresholds::default()))) @@ -4230,7 +4206,7 @@ fn test_scoring_p6() { #[test] fn test_scoring_p7_grafts_before_backoff() { - let config = GossipsubConfigBuilder::default() + let config = ConfigBuilder::default() .prune_backoff(Duration::from_millis(200)) .graft_flood_threshold(Duration::from_millis(100)) .build() @@ -4300,7 +4276,7 @@ fn test_scoring_p7_grafts_before_backoff() { #[test] fn test_opportunistic_grafting() { - let config = GossipsubConfigBuilder::default() + let config = ConfigBuilder::default() .mesh_n_low(3) .mesh_n(5) .mesh_n_high(7) @@ -4407,10 +4383,7 @@ fn test_ignore_graft_from_unknown_topic() { //assert that no prune got created assert_eq!( - count_control_msgs(&gs, |_, a| matches!( - a, - GossipsubControlAction::Prune { .. } - )), + count_control_msgs(&gs, |_, a| matches!(a, ControlAction::Prune { .. })), 0, "we should not prune after graft in unknown topic" ); @@ -4418,7 +4391,7 @@ fn test_ignore_graft_from_unknown_topic() { #[test] fn test_ignore_too_many_iwants_from_same_peer_for_same_message() { - let config = GossipsubConfig::default(); + let config = Config::default(); //build gossipsub with full mesh let (mut gs, _, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) @@ -4454,7 +4427,7 @@ fn test_ignore_too_many_iwants_from_same_peer_for_same_message() { .iter() .map(|e| match e { NetworkBehaviourAction::NotifyHandler { - event: GossipsubHandlerIn::Message(ref m), + event: HandlerIn::Message(ref m), .. } => { let event = proto_to_message(m); @@ -4470,7 +4443,7 @@ fn test_ignore_too_many_iwants_from_same_peer_for_same_message() { #[test] fn test_ignore_too_many_ihaves() { - let config = GossipsubConfigBuilder::default() + let config = ConfigBuilder::default() .max_ihave_messages(10) .build() .unwrap(); @@ -4513,7 +4486,7 @@ fn test_ignore_too_many_ihaves() { //we send iwant only for the first 10 messages assert_eq!( count_control_msgs(&gs, |p, action| p == &peer - && matches!(action, GossipsubControlAction::IWant { message_ids } if message_ids.len() == 1 && first_ten.contains(&message_ids[0]))), + && matches!(action, ControlAction::IWant { message_ids } if message_ids.len() == 1 && first_ten.contains(&message_ids[0]))), 10, "exactly the first ten ihaves should be processed and one iwant for each created" ); @@ -4536,7 +4509,7 @@ fn test_ignore_too_many_ihaves() { //we sent iwant for all 20 messages assert_eq!( count_control_msgs(&gs, |p, action| p == &peer - && matches!(action, GossipsubControlAction::IWant { message_ids } if message_ids.len() == 1)), + && matches!(action, ControlAction::IWant { message_ids } if message_ids.len() == 1)), 20, "all 20 should get sent" ); @@ -4544,7 +4517,7 @@ fn test_ignore_too_many_ihaves() { #[test] fn test_ignore_too_many_messages_in_ihave() { - let config = GossipsubConfigBuilder::default() + let config = ConfigBuilder::default() .max_ihave_messages(10) .max_ihave_length(10) .build() @@ -4585,7 +4558,7 @@ fn test_ignore_too_many_messages_in_ihave() { let mut sum = 0; assert_eq!( count_control_msgs(&gs, |p, action| match action { - GossipsubControlAction::IWant { message_ids } => + ControlAction::IWant { message_ids } => p == &peer && { assert!(first_twelve.is_superset(&message_ids.iter().collect())); sum += message_ids.len(); @@ -4610,7 +4583,7 @@ fn test_ignore_too_many_messages_in_ihave() { let mut sum = 0; assert_eq!( count_control_msgs(&gs, |p, action| match action { - GossipsubControlAction::IWant { message_ids } => + ControlAction::IWant { message_ids } => p == &peer && { sum += message_ids.len(); true @@ -4624,7 +4597,7 @@ fn test_ignore_too_many_messages_in_ihave() { #[test] fn test_limit_number_of_message_ids_inside_ihave() { - let config = GossipsubConfigBuilder::default() + let config = ConfigBuilder::default() .max_ihave_messages(10) .max_ihave_length(100) .build() @@ -4664,7 +4637,7 @@ fn test_limit_number_of_message_ids_inside_ihave() { assert_eq!( count_control_msgs(&gs, |p, action| match action { - GossipsubControlAction::IHave { message_ids, .. } => { + ControlAction::IHave { message_ids, .. } => { if p == &p1 { ihaves1 = message_ids.iter().cloned().collect(); true @@ -4707,7 +4680,7 @@ fn test_limit_number_of_message_ids_inside_ihave() { fn test_iwant_penalties() { let _ = env_logger::try_init(); - let config = GossipsubConfigBuilder::default() + let config = ConfigBuilder::default() .iwant_followup_time(Duration::from_secs(4)) .build() .unwrap(); @@ -4825,7 +4798,7 @@ fn test_iwant_penalties() { #[test] fn test_publish_to_floodsub_peers_without_flood_publish() { - let config = GossipsubConfigBuilder::default() + let config = ConfigBuilder::default() .flood_publish(false) .build() .unwrap(); @@ -4861,7 +4834,7 @@ fn test_publish_to_floodsub_peers_without_flood_publish() { .fold(vec![], |mut collected_publish, e| match e { NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => { if peer_id == &p1 || peer_id == &p2 { - if let GossipsubHandlerIn::Message(ref m) = event { + if let HandlerIn::Message(ref m) = event { let event = proto_to_message(m); for s in &event.messages { collected_publish.push(s.clone()); @@ -4882,7 +4855,7 @@ fn test_publish_to_floodsub_peers_without_flood_publish() { #[test] fn test_do_not_use_floodsub_in_fanout() { - let config = GossipsubConfigBuilder::default() + let config = ConfigBuilder::default() .flood_publish(false) .build() .unwrap(); @@ -4918,7 +4891,7 @@ fn test_do_not_use_floodsub_in_fanout() { .fold(vec![], |mut collected_publish, e| match e { NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => { if peer_id == &p1 || peer_id == &p2 { - if let GossipsubHandlerIn::Message(ref m) = event { + if let HandlerIn::Message(ref m) = event { let event = proto_to_message(m); for s in &event.messages { collected_publish.push(s.clone()); @@ -5000,7 +4973,7 @@ fn test_dont_send_px_to_old_gossipsub_peers() { //check that prune does not contain px assert_eq!( count_control_msgs(&gs, |_, m| match m { - GossipsubControlAction::Prune { peers: px, .. } => !px.is_empty(), + ControlAction::Prune { peers: px, .. } => !px.is_empty(), _ => false, }), 0, @@ -5038,7 +5011,7 @@ fn test_dont_send_floodsub_peers_in_px() { //check that px in prune message is empty assert_eq!( count_control_msgs(&gs, |_, m| match m { - GossipsubControlAction::Prune { peers: px, .. } => !px.is_empty(), + ControlAction::Prune { peers: px, .. } => !px.is_empty(), _ => false, }), 0, @@ -5138,7 +5111,7 @@ fn test_msg_id_fn_only_called_once_with_fast_message_ids() { }}; } - let message_id_fn = |m: &GossipsubMessage| -> MessageId { + let message_id_fn = |m: &Message| -> MessageId { let (mut id, mut counters_pointer): (MessageId, *mut Pointers) = get_counters_and_hash!(&m.data); unsafe { @@ -5147,14 +5120,14 @@ fn test_msg_id_fn_only_called_once_with_fast_message_ids() { id.0.reverse(); id }; - let fast_message_id_fn = |m: &RawGossipsubMessage| -> FastMessageId { + let fast_message_id_fn = |m: &RawMessage| -> FastMessageId { let (id, mut counters_pointer) = get_counters_and_hash!(&m.data); unsafe { (*counters_pointer).fast_counter += 1; } id }; - let config = GossipsubConfigBuilder::default() + let config = ConfigBuilder::default() .message_id_fn(message_id_fn) .fast_message_id_fn(fast_message_id_fn) .build() @@ -5166,7 +5139,7 @@ fn test_msg_id_fn_only_called_once_with_fast_message_ids() { .gs_config(config) .create_network(); - let message = RawGossipsubMessage { + let message = RawMessage { source: None, data: counters_address.to_be_bytes().to_vec(), sequence_number: None, @@ -5228,12 +5201,12 @@ fn test_subscribe_and_graft_with_negative_score() { //subscribe to topic in gs2 gs2.subscribe(&topic).unwrap(); - let forward_messages_to_p1 = |gs1: &mut Gossipsub<_, _>, gs2: &mut Gossipsub<_, _>| { + let forward_messages_to_p1 = |gs1: &mut Behaviour<_, _>, gs2: &mut Behaviour<_, _>| { //collect messages to p1 let messages_to_p1 = gs2.events.drain(..).filter_map(|e| match e { NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => { if peer_id == p1 { - if let GossipsubHandlerIn::Message(m) = event { + if let HandlerIn::Message(m) = event { Some(m) } else { None diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index 0b31dd74724..37e7764e077 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -24,7 +24,7 @@ use std::time::Duration; use libp2p_core::PeerId; -use crate::types::{FastMessageId, GossipsubMessage, MessageId, RawGossipsubMessage}; +use crate::types::{FastMessageId, Message, MessageId, RawMessage}; /// The types of message validation that can be employed by gossipsub. #[derive(Debug, Clone)] @@ -51,16 +51,16 @@ pub enum ValidationMode { /// Selector for custom Protocol Id #[derive(Clone, Debug, PartialEq, Eq)] -pub enum GossipsubVersion { +pub enum Version { V1_0, V1_1, } /// Configuration parameters that define the performance of the gossipsub network. #[derive(Clone)] -pub struct GossipsubConfig { +pub struct Config { protocol_id: Cow<'static, str>, - custom_id_version: Option, + custom_id_version: Option, history_length: usize, history_gossip: usize, mesh_n: usize, @@ -78,9 +78,8 @@ pub struct GossipsubConfig { duplicate_cache_time: Duration, validate_messages: bool, validation_mode: ValidationMode, - message_id_fn: Arc MessageId + Send + Sync + 'static>, - fast_message_id_fn: - Option FastMessageId + Send + Sync + 'static>>, + message_id_fn: Arc MessageId + Send + Sync + 'static>, + fast_message_id_fn: Option FastMessageId + Send + Sync + 'static>>, allow_self_origin: bool, do_px: bool, prune_peers: usize, @@ -101,22 +100,22 @@ pub struct GossipsubConfig { published_message_ids_cache_time: Duration, } -impl GossipsubConfig { +impl Config { // All the getters /// The protocol id to negotiate this protocol. By default, the resulting protocol id has the form - /// `//`, but can optionally be changed to a literal form by providing some GossipsubVersion as custom_id_version. + /// `//`, but can optionally be changed to a literal form by providing some Version as custom_id_version. /// As gossipsub supports version 1.0 and 1.1, there are two suffixes supported for the resulting protocol id. /// - /// Calling `GossipsubConfigBuilder::protocol_id_prefix` will set a new prefix and retain the prefix logic. - /// Calling `GossipsubConfigBuilder::protocol_id` will set a custom `protocol_id` and disable the prefix logic. + /// Calling [`ConfigBuilder::protocol_id_prefix`] will set a new prefix and retain the prefix logic. + /// Calling [`ConfigBuilder::protocol_id`] will set a custom `protocol_id` and disable the prefix logic. /// /// The default prefix is `meshsub`, giving the supported protocol ids: `/meshsub/1.1.0` and `/meshsub/1.0.0`, negotiated in that order. pub fn protocol_id(&self) -> &Cow<'static, str> { &self.protocol_id } - pub fn custom_id_version(&self) -> &Option { + pub fn custom_id_version(&self) -> &Option { &self.custom_id_version } @@ -217,7 +216,7 @@ impl GossipsubConfig { /// When set to `true`, prevents automatic forwarding of all received messages. This setting /// allows a user to validate the messages before propagating them to their peers. If set to - /// true, the user must manually call [`crate::Gossipsub::report_message_validation_result()`] + /// true, the user must manually call [`crate::Behaviour::report_message_validation_result()`] /// on the behaviour to forward message once validated (default is `false`). /// The default is `false`. pub fn validate_messages(&self) -> bool { @@ -236,21 +235,21 @@ impl GossipsubConfig { /// addressing, where this function may be set to `hash(message)`. This would prevent messages /// of the same content from being duplicated. /// - /// The function takes a [`GossipsubMessage`] as input and outputs a String to be interpreted as + /// The function takes a [`Message`] as input and outputs a String to be interpreted as /// the message id. - pub fn message_id(&self, message: &GossipsubMessage) -> MessageId { + pub fn message_id(&self, message: &Message) -> MessageId { (self.message_id_fn)(message) } /// A user-defined optional function that computes fast ids from raw messages. This can be used - /// to avoid possibly expensive transformations from [`RawGossipsubMessage`] to - /// [`GossipsubMessage`] for duplicates. Two semantically different messages must always + /// to avoid possibly expensive transformations from [`RawMessage`] to + /// [`Message`] for duplicates. Two semantically different messages must always /// have different fast message ids, but it is allowed that two semantically identical messages /// have different fast message ids as long as the message_id_fn produces the same id for them. /// - /// The function takes a [`RawGossipsubMessage`] as input and outputs a String to be + /// The function takes a [`RawMessage`] as input and outputs a String to be /// interpreted as the fast message id. Default is None. - pub fn fast_message_id(&self, message: &RawGossipsubMessage) -> Option { + pub fn fast_message_id(&self, message: &RawMessage) -> Option { self.fast_message_id_fn .as_ref() .map(|fast_message_id_fn| fast_message_id_fn(message)) @@ -391,24 +390,24 @@ impl GossipsubConfig { } } -impl Default for GossipsubConfig { +impl Default for Config { fn default() -> Self { // use ConfigBuilder to also validate defaults - GossipsubConfigBuilder::default() + ConfigBuilder::default() .build() .expect("Default config parameters should be valid parameters") } } /// The builder struct for constructing a gossipsub configuration. -pub struct GossipsubConfigBuilder { - config: GossipsubConfig, +pub struct ConfigBuilder { + config: Config, } -impl Default for GossipsubConfigBuilder { +impl Default for ConfigBuilder { fn default() -> Self { - GossipsubConfigBuilder { - config: GossipsubConfig { + ConfigBuilder { + config: Config { protocol_id: Cow::Borrowed("meshsub"), custom_id_version: None, history_length: 5, @@ -466,13 +465,13 @@ impl Default for GossipsubConfigBuilder { } } -impl From for GossipsubConfigBuilder { - fn from(config: GossipsubConfig) -> Self { - GossipsubConfigBuilder { config } +impl From for ConfigBuilder { + fn from(config: Config) -> Self { + ConfigBuilder { config } } } -impl GossipsubConfigBuilder { +impl ConfigBuilder { /// The protocol id prefix to negotiate this protocol (default is `/meshsub/1.0.0`). pub fn protocol_id_prefix( &mut self, @@ -487,7 +486,7 @@ impl GossipsubConfigBuilder { pub fn protocol_id( &mut self, protocol_id: impl Into>, - custom_id_version: GossipsubVersion, + custom_id_version: Version, ) -> &mut Self { self.config.custom_id_version = Some(custom_id_version); self.config.protocol_id = protocol_id.into(); @@ -600,7 +599,7 @@ impl GossipsubConfigBuilder { /// When set, prevents automatic forwarding of all received messages. This setting /// allows a user to validate the messages before propagating them to their peers. If set, - /// the user must manually call [`crate::Gossipsub::report_message_validation_result()`] on the + /// the user must manually call [`crate::Behaviour::report_message_validation_result()`] on the /// behaviour to forward a message once validated. pub fn validate_messages(&mut self) -> &mut Self { self.config.validate_messages = true; @@ -620,27 +619,27 @@ impl GossipsubConfigBuilder { /// addressing, where this function may be set to `hash(message)`. This would prevent messages /// of the same content from being duplicated. /// - /// The function takes a [`GossipsubMessage`] as input and outputs a String to be + /// The function takes a [`Message`] as input and outputs a String to be /// interpreted as the message id. pub fn message_id_fn(&mut self, id_fn: F) -> &mut Self where - F: Fn(&GossipsubMessage) -> MessageId + Send + Sync + 'static, + F: Fn(&Message) -> MessageId + Send + Sync + 'static, { self.config.message_id_fn = Arc::new(id_fn); self } /// A user-defined optional function that computes fast ids from raw messages. This can be used - /// to avoid possibly expensive transformations from [`RawGossipsubMessage`] to - /// [`GossipsubMessage`] for duplicates. Two semantically different messages must always + /// to avoid possibly expensive transformations from [`RawMessage`] to + /// [`Message`] for duplicates. Two semantically different messages must always /// have different fast message ids, but it is allowed that two semantically identical messages /// have different fast message ids as long as the message_id_fn produces the same id for them. /// - /// The function takes a [`RawGossipsubMessage`] as input and outputs a String to be interpreted + /// The function takes a [`Message`] as input and outputs a String to be interpreted /// as the fast message id. Default is None. pub fn fast_message_id_fn(&mut self, fast_id_fn: F) -> &mut Self where - F: Fn(&RawGossipsubMessage) -> FastMessageId + Send + Sync + 'static, + F: Fn(&RawMessage) -> FastMessageId + Send + Sync + 'static, { self.config.fast_message_id_fn = Some(Arc::new(fast_id_fn)); self @@ -801,8 +800,8 @@ impl GossipsubConfigBuilder { self } - /// Constructs a [`GossipsubConfig`] from the given configuration and validates the settings. - pub fn build(&self) -> Result { + /// Constructs a [`Config`] from the given configuration and validates the settings. + pub fn build(&self) -> Result { // check all constraints on config if self.config.max_transmit_size < 100 { @@ -838,7 +837,7 @@ impl GossipsubConfigBuilder { } } -impl std::fmt::Debug for GossipsubConfig { +impl std::fmt::Debug for Config { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let mut builder = f.debug_struct("GossipsubConfig"); let _ = builder.field("protocol_id", &self.protocol_id); @@ -895,7 +894,7 @@ mod test { #[test] fn create_thing() { - let builder: GossipsubConfig = GossipsubConfigBuilder::default() + let builder: Config = ConfigBuilder::default() .protocol_id_prefix("purple") .build() .unwrap(); @@ -903,8 +902,8 @@ mod test { dbg!(builder); } - fn get_gossipsub_message() -> GossipsubMessage { - GossipsubMessage { + fn get_gossipsub_message() -> Message { + Message { source: None, data: vec![12, 34, 56], sequence_number: None, @@ -918,7 +917,7 @@ mod test { ]) } - fn message_id_plain_function(message: &GossipsubMessage) -> MessageId { + fn message_id_plain_function(message: &Message) -> MessageId { let mut s = DefaultHasher::new(); message.data.hash(&mut s); let mut v = s.finish().to_string(); @@ -928,7 +927,7 @@ mod test { #[test] fn create_config_with_message_id_as_plain_function() { - let builder: GossipsubConfig = GossipsubConfigBuilder::default() + let builder: Config = ConfigBuilder::default() .protocol_id_prefix("purple") .message_id_fn(message_id_plain_function) .build() @@ -940,7 +939,7 @@ mod test { #[test] fn create_config_with_message_id_as_closure() { - let closure = |message: &GossipsubMessage| { + let closure = |message: &Message| { let mut s = DefaultHasher::new(); message.data.hash(&mut s); let mut v = s.finish().to_string(); @@ -948,7 +947,7 @@ mod test { MessageId::from(v) }; - let builder: GossipsubConfig = GossipsubConfigBuilder::default() + let builder: Config = ConfigBuilder::default() .protocol_id_prefix("purple") .message_id_fn(closure) .build() @@ -961,7 +960,7 @@ mod test { #[test] fn create_config_with_message_id_as_closure_with_variable_capture() { let captured: char = 'e'; - let closure = move |message: &GossipsubMessage| { + let closure = move |message: &Message| { let mut s = DefaultHasher::new(); message.data.hash(&mut s); let mut v = s.finish().to_string(); @@ -969,7 +968,7 @@ mod test { MessageId::from(v) }; - let builder: GossipsubConfig = GossipsubConfigBuilder::default() + let builder: Config = ConfigBuilder::default() .protocol_id_prefix("purple") .message_id_fn(closure) .build() @@ -981,7 +980,7 @@ mod test { #[test] fn create_config_with_protocol_id_prefix() { - let builder: GossipsubConfig = GossipsubConfigBuilder::default() + let builder: Config = ConfigBuilder::default() .protocol_id_prefix("purple") .validation_mode(ValidationMode::Anonymous) .message_id_fn(message_id_plain_function) @@ -1005,15 +1004,15 @@ mod test { #[test] fn create_config_with_custom_protocol_id() { - let builder: GossipsubConfig = GossipsubConfigBuilder::default() - .protocol_id("purple", GossipsubVersion::V1_0) + let builder: Config = ConfigBuilder::default() + .protocol_id("purple", Version::V1_0) .validation_mode(ValidationMode::Anonymous) .message_id_fn(message_id_plain_function) .build() .unwrap(); assert_eq!(builder.protocol_id(), "purple"); - assert_eq!(builder.custom_id_version(), &Some(GossipsubVersion::V1_0)); + assert_eq!(builder.custom_id_version(), &Some(Version::V1_0)); let protocol_config = ProtocolConfig::new(&builder); let protocol_ids = protocol_config.protocol_info(); diff --git a/protocols/gossipsub/src/error.rs b/protocols/gossipsub/src/error.rs index 1ebca61eaa6..feb9231b662 100644 --- a/protocols/gossipsub/src/error.rs +++ b/protocols/gossipsub/src/error.rs @@ -86,9 +86,15 @@ impl From for PublishError { } } +#[deprecated( + since = "0.44.0", + note = "Use re-exports that omit `Gossipsub` prefix, i.e. `libp2p::gossipsub::HandlerError" +)] +pub type GossipsubHandlerError = HandlerError; + /// Errors that can occur in the protocols handler. #[derive(Debug, Error)] -pub enum GossipsubHandlerError { +pub enum HandlerError { #[error("The maximum number of inbound substreams created has been exceeded.")] MaxInboundSubstreams, #[error("The maximum number of outbound substreams created has been exceeded.")] @@ -134,9 +140,9 @@ impl std::fmt::Display for ValidationError { impl std::error::Error for ValidationError {} -impl From for GossipsubHandlerError { - fn from(error: std::io::Error) -> GossipsubHandlerError { - GossipsubHandlerError::Codec(prost_codec::Error::from(error)) +impl From for HandlerError { + fn from(error: std::io::Error) -> HandlerError { + HandlerError::Codec(prost_codec::Error::from(error)) } } diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 8fd563c37a6..39a65e7455a 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -18,9 +18,9 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::error::{GossipsubHandlerError, ValidationError}; +use crate::error::{HandlerError, ValidationError}; use crate::protocol::{GossipsubCodec, ProtocolConfig}; -use crate::types::{GossipsubRpc, PeerKind, RawGossipsubMessage}; +use crate::types::{PeerKind, RawMessage, Rpc}; use asynchronous_codec::Framed; use futures::prelude::*; use futures::StreamExt; @@ -53,10 +53,10 @@ pub enum HandlerEvent { /// any) that were received. Message { /// The GossipsubRPC message excluding any invalid messages. - rpc: GossipsubRpc, + rpc: Rpc, /// Any invalid messages that were received in the RPC, along with the associated /// validation error. - invalid_messages: Vec<(RawGossipsubMessage, ValidationError)>, + invalid_messages: Vec<(RawMessage, ValidationError)>, }, /// An inbound or outbound substream has been established with the peer and this informs over /// which protocol. This message only occurs once per connection. @@ -65,7 +65,7 @@ pub enum HandlerEvent { /// A message sent from the behaviour to the handler. #[derive(Debug)] -pub enum GossipsubHandlerIn { +pub enum HandlerIn { /// A gossipsub message to send. Message(crate::rpc_proto::Rpc), /// The peer has joined the mesh. @@ -82,7 +82,7 @@ pub enum GossipsubHandlerIn { const MAX_SUBSTREAM_CREATION: usize = 5; /// Protocol Handler that manages a single long-lived substream with a peer. -pub struct GossipsubHandler { +pub struct Handler { /// Upgrade configuration for the gossipsub protocol. listen_protocol: SubstreamProtocol, @@ -124,7 +124,7 @@ pub struct GossipsubHandler { idle_timeout: Duration, /// Collection of errors from attempting an upgrade. - upgrade_errors: VecDeque>, + upgrade_errors: VecDeque>, /// Flag determining whether to maintain the connection to the peer. keep_alive: KeepAlive, @@ -161,10 +161,10 @@ enum OutboundSubstreamState { Poisoned, } -impl GossipsubHandler { - /// Builds a new [`GossipsubHandler`]. +impl Handler { + /// Builds a new [`Handler`]. pub fn new(protocol_config: ProtocolConfig, idle_timeout: Duration) -> Self { - GossipsubHandler { + Handler { listen_protocol: SubstreamProtocol::new(protocol_config, ()), inbound_substream: None, outbound_substream: None, @@ -245,10 +245,10 @@ impl GossipsubHandler { } } -impl ConnectionHandler for GossipsubHandler { - type InEvent = GossipsubHandlerIn; +impl ConnectionHandler for Handler { + type InEvent = HandlerIn; type OutEvent = HandlerEvent; - type Error = GossipsubHandlerError; + type Error = HandlerError; type InboundOpenInfo = (); type InboundProtocol = ProtocolConfig; type OutboundOpenInfo = crate::rpc_proto::Rpc; @@ -258,17 +258,17 @@ impl ConnectionHandler for GossipsubHandler { self.listen_protocol.clone() } - fn on_behaviour_event(&mut self, message: GossipsubHandlerIn) { + fn on_behaviour_event(&mut self, message: HandlerIn) { if !self.protocol_unsupported { match message { - GossipsubHandlerIn::Message(m) => self.send_queue.push(m), + HandlerIn::Message(m) => self.send_queue.push(m), // If we have joined the mesh, keep the connection alive. - GossipsubHandlerIn::JoinedMesh => { + HandlerIn::JoinedMesh => { self.in_mesh = true; self.keep_alive = KeepAlive::Yes; } // If we have left the mesh, start the idle timer. - GossipsubHandlerIn::LeftMesh => { + HandlerIn::LeftMesh => { self.in_mesh = false; self.keep_alive = KeepAlive::Until(Instant::now() + self.idle_timeout); } @@ -296,7 +296,7 @@ impl ConnectionHandler for GossipsubHandler { let reported_error = match error { // Timeout errors get mapped to NegotiationTimeout and we close the connection. ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer => { - Some(GossipsubHandlerError::NegotiationTimeout) + Some(HandlerError::NegotiationTimeout) } // There was an error post negotiation, close the connection. ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => Some(e), @@ -319,7 +319,7 @@ impl ConnectionHandler for GossipsubHandler { } } NegotiationError::ProtocolError(e) => { - Some(GossipsubHandlerError::NegotiationProtocolError(e)) + Some(HandlerError::NegotiationProtocolError(e)) } } } @@ -343,7 +343,7 @@ impl ConnectionHandler for GossipsubHandler { if self.inbound_substreams_created > MAX_SUBSTREAM_CREATION { // Too many inbound substreams have been created, end the connection. return Poll::Ready(ConnectionHandlerEvent::Close( - GossipsubHandlerError::MaxInboundSubstreams, + HandlerError::MaxInboundSubstreams, )); } @@ -354,7 +354,7 @@ impl ConnectionHandler for GossipsubHandler { { if self.outbound_substreams_created >= MAX_SUBSTREAM_CREATION { return Poll::Ready(ConnectionHandlerEvent::Close( - GossipsubHandlerError::MaxOutboundSubstreams, + HandlerError::MaxOutboundSubstreams, )); } let message = self.send_queue.remove(0); @@ -384,7 +384,7 @@ impl ConnectionHandler for GossipsubHandler { } Poll::Ready(Some(Err(error))) => { match error { - GossipsubHandlerError::MaxTransmissionSize => { + HandlerError::MaxTransmissionSize => { warn!("Message exceeded the maximum transmission size"); self.inbound_substream = Some(InboundSubstreamState::WaitingInput(substream)); @@ -471,7 +471,7 @@ impl ConnectionHandler for GossipsubHandler { self.outbound_substream = Some(OutboundSubstreamState::PendingFlush(substream)) } - Err(GossipsubHandlerError::MaxTransmissionSize) => { + Err(HandlerError::MaxTransmissionSize) => { error!("Message exceeded the maximum transmission size and was not sent."); self.outbound_substream = Some(OutboundSubstreamState::WaitingOutput(substream)); diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index ad2c1f1fbf0..3e5eaa641ff 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -67,31 +67,31 @@ //! //! # Using Gossipsub //! -//! ## GossipsubConfig +//! ## Gossipsub Config //! -//! The [`GossipsubConfig`] struct specifies various network performance/tuning configuration +//! The [`Config`] struct specifies various network performance/tuning configuration //! parameters. Specifically it specifies: //! -//! [`GossipsubConfig`]: struct.Config.html +//! [`Config`]: struct.Config.html //! //! This struct implements the [`Default`] trait and can be initialised via -//! [`GossipsubConfig::default()`]. +//! [`Config::default()`]. //! //! -//! ## Gossipsub +//! ## Behaviour //! -//! The [`Gossipsub`] struct implements the [`libp2p_swarm::NetworkBehaviour`] trait allowing it to +//! The [`Behaviour`] struct implements the [`libp2p_swarm::NetworkBehaviour`] trait allowing it to //! act as the routing behaviour in a [`libp2p_swarm::Swarm`]. This struct requires an instance of -//! [`libp2p_core::PeerId`] and [`GossipsubConfig`]. +//! [`libp2p_core::PeerId`] and [`Config`]. //! -//! [`Gossipsub`]: struct.Gossipsub.html +//! [`Behaviour`]: struct.Behaviour.html //! ## Example //! //! An example of initialising a gossipsub compatible swarm: //! //! ``` -//! use libp2p_gossipsub::GossipsubEvent; +//! use libp2p_gossipsub::Event; //! use libp2p_core::{identity::Keypair,transport::{Transport, MemoryTransport}, Multiaddr}; //! use libp2p_gossipsub::MessageAuthenticity; //! let local_key = Keypair::generate_ed25519(); @@ -115,10 +115,10 @@ //! // Create a Swarm to manage peers and events //! let mut swarm = { //! // set default parameters for gossipsub -//! let gossipsub_config = libp2p_gossipsub::GossipsubConfig::default(); +//! let gossipsub_config = libp2p_gossipsub::Config::default(); //! // build a gossipsub network behaviour -//! let mut gossipsub: libp2p_gossipsub::Gossipsub = -//! libp2p_gossipsub::Gossipsub::new(message_authenticity, gossipsub_config).unwrap(); +//! let mut gossipsub: libp2p_gossipsub::Behaviour = +//! libp2p_gossipsub::Behaviour::new(message_authenticity, gossipsub_config).unwrap(); //! // subscribe to the topic //! gossipsub.subscribe(&topic); //! // create the swarm (use an executor in a real example) @@ -156,18 +156,64 @@ mod types; mod rpc_proto; -pub use self::behaviour::{Gossipsub, GossipsubEvent, MessageAuthenticity}; -pub use self::transform::{DataTransform, IdentityTransform}; - -pub use self::config::{GossipsubConfig, GossipsubConfigBuilder, GossipsubVersion, ValidationMode}; +pub use self::behaviour::{Behaviour, Event, MessageAuthenticity}; +pub use self::config::{Config, ConfigBuilder, ValidationMode, Version}; +pub use self::error::{HandlerError, PublishError, SubscriptionError, ValidationError}; pub use self::peer_score::{ score_parameter_decay, score_parameter_decay_with_base, PeerScoreParams, PeerScoreThresholds, TopicScoreParams, }; pub use self::topic::{Hasher, Topic, TopicHash}; -pub use self::types::{ - FastMessageId, GossipsubMessage, GossipsubRpc, MessageAcceptance, MessageId, - RawGossipsubMessage, -}; +pub use self::transform::{DataTransform, IdentityTransform}; +pub use self::types::{FastMessageId, Message, MessageAcceptance, MessageId, RawMessage, Rpc}; + +#[deprecated( + since = "0.44.0", + note = "Use `Behaviour` instead of `Gossipsub` for Network Behaviour, i.e. `libp2p::gossipsub::Behaviour" +)] +pub type Gossipsub = Behaviour; + +#[deprecated( + since = "0.44.0", + note = "Use re-exports that omit `Gossipsub` prefix, i.e. `libp2p::gossipsub::Event" +)] +pub type GossipsubEvent = Event; + +#[deprecated( + since = "0.44.0", + note = "Use re-exports that omit `Gossipsub` prefix, i.e. `libp2p::gossipsub::Config" +)] +pub type GossipsubConfig = Config; + +#[deprecated( + since = "0.44.0", + note = "Use re-exports that omit `Gossipsub` prefix, i.e. `libp2p::gossipsub::Message" +)] +pub type GossipsubMessage = Message; + +#[deprecated( + since = "0.44.0", + note = "Use re-exports that omit `Gossipsub` prefix, i.e. `libp2p::gossipsub::Rpc" +)] +pub type GossipsubRpc = Rpc; + +#[deprecated( + since = "0.44.0", + note = "Use re-exports that omit `Gossipsub` infix, i.e. `libp2p::gossipsub::RawMessage" +)] +pub type RawGossipsubMessage = RawMessage; + +#[deprecated( + since = "0.44.0", + note = "Use re-exports that omit `Gossipsub` prefix, i.e. `libp2p::gossipsub::ConfigBuilder" +)] +pub type GossipsubConfigBuilder = ConfigBuilder; + +#[deprecated( + since = "0.44.0", + note = "Use re-exports that omit `Gossipsub` prefix, i.e. `libp2p::gossipsub::Version" +)] +pub type GossipsubVersion = Version; + pub type IdentTopic = Topic; pub type Sha256Topic = Topic; diff --git a/protocols/gossipsub/src/mcache.rs b/protocols/gossipsub/src/mcache.rs index ef838c82a8d..01aa6b37269 100644 --- a/protocols/gossipsub/src/mcache.rs +++ b/protocols/gossipsub/src/mcache.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::topic::TopicHash; -use crate::types::{MessageId, RawGossipsubMessage}; +use crate::types::{MessageId, RawMessage}; use libp2p_core::PeerId; use log::{debug, trace}; use std::collections::hash_map::Entry; @@ -39,7 +39,7 @@ pub struct CacheEntry { /// MessageCache struct holding history of messages. #[derive(Clone)] pub struct MessageCache { - msgs: HashMap)>, + msgs: HashMap)>, /// For every message and peer the number of times this peer asked for the message iwant_counts: HashMap>, history: Vec>, @@ -73,7 +73,7 @@ impl MessageCache { /// Put a message into the memory cache. /// /// Returns true if the message didn't already exist in the cache. - pub fn put(&mut self, message_id: &MessageId, msg: RawGossipsubMessage) -> bool { + pub fn put(&mut self, message_id: &MessageId, msg: RawMessage) -> bool { match self.msgs.entry(message_id.clone()) { Entry::Occupied(_) => { // Don't add duplicate entries to the cache. @@ -108,7 +108,7 @@ impl MessageCache { /// Get a message with `message_id` #[cfg(test)] - pub fn get(&self, message_id: &MessageId) -> Option<&RawGossipsubMessage> { + pub fn get(&self, message_id: &MessageId) -> Option<&RawMessage> { self.msgs.get(message_id).map(|(message, _)| message) } @@ -118,7 +118,7 @@ impl MessageCache { &mut self, message_id: &MessageId, peer: &PeerId, - ) -> Option<(&RawGossipsubMessage, u32)> { + ) -> Option<(&RawMessage, u32)> { let iwant_counts = &mut self.iwant_counts; self.msgs.get(message_id).and_then(|(message, _)| { if !message.validated { @@ -140,10 +140,7 @@ impl MessageCache { /// Gets a message with [`MessageId`] and tags it as validated. /// This function also returns the known peers that have sent us this message. This is used to /// prevent us sending redundant messages to peers who have already propagated it. - pub fn validate( - &mut self, - message_id: &MessageId, - ) -> Option<(&RawGossipsubMessage, HashSet)> { + pub fn validate(&mut self, message_id: &MessageId) -> Option<(&RawMessage, HashSet)> { self.msgs.get_mut(message_id).map(|(message, known_peers)| { message.validated = true; // Clear the known peers list (after a message is validated, it is forwarded and we no @@ -207,10 +204,7 @@ impl MessageCache { } /// Removes a message from the cache and returns it if existent - pub fn remove( - &mut self, - message_id: &MessageId, - ) -> Option<(RawGossipsubMessage, HashSet)> { + pub fn remove(&mut self, message_id: &MessageId) -> Option<(RawMessage, HashSet)> { //We only remove the message from msgs and iwant_count and keep the message_id in the // history vector. Zhe id in the history vector will simply be ignored on popping. @@ -222,12 +216,12 @@ impl MessageCache { #[cfg(test)] mod tests { use super::*; - use crate::types::RawGossipsubMessage; + use crate::types::RawMessage; use crate::{IdentTopic as Topic, TopicHash}; use libp2p_core::PeerId; - fn gen_testm(x: u64, topic: TopicHash) -> (MessageId, RawGossipsubMessage) { - let default_id = |message: &RawGossipsubMessage| { + fn gen_testm(x: u64, topic: TopicHash) -> (MessageId, RawMessage) { + let default_id = |message: &RawMessage| { // default message id is: source + sequence number let mut source_string = message.source.as_ref().unwrap().to_base58(); source_string.push_str(&message.sequence_number.unwrap().to_string()); @@ -238,7 +232,7 @@ mod tests { let data: Vec = vec![u8x]; let sequence_number = Some(x); - let m = RawGossipsubMessage { + let m = RawMessage { source, data, sequence_number, diff --git a/protocols/gossipsub/src/peer_score/tests.rs b/protocols/gossipsub/src/peer_score/tests.rs index 52ce8c1d9a9..064e277eed7 100644 --- a/protocols/gossipsub/src/peer_score/tests.rs +++ b/protocols/gossipsub/src/peer_score/tests.rs @@ -21,8 +21,8 @@ /// A collection of unit tests mostly ported from the go implementation. use super::*; -use crate::types::RawGossipsubMessage; -use crate::{GossipsubMessage, IdentTopic as Topic}; +use crate::types::RawMessage; +use crate::{IdentTopic as Topic, Message}; // estimates a value within variance fn within_variance(value: f64, expected: f64, variance: f64) -> bool { @@ -33,8 +33,8 @@ fn within_variance(value: f64, expected: f64, variance: f64) -> bool { } // generates a random gossipsub message with sequence number i -fn make_test_message(seq: u64) -> (MessageId, RawGossipsubMessage) { - let raw_message = RawGossipsubMessage { +fn make_test_message(seq: u64) -> (MessageId, RawMessage) { + let raw_message = RawMessage { source: Some(PeerId::random()), data: vec![12, 34, 56], sequence_number: Some(seq), @@ -44,7 +44,7 @@ fn make_test_message(seq: u64) -> (MessageId, RawGossipsubMessage) { validated: true, }; - let message = GossipsubMessage { + let message = Message { source: raw_message.source, data: raw_message.data.clone(), sequence_number: raw_message.sequence_number, @@ -55,7 +55,7 @@ fn make_test_message(seq: u64) -> (MessageId, RawGossipsubMessage) { (id, raw_message) } -fn default_message_id() -> fn(&GossipsubMessage) -> MessageId { +fn default_message_id() -> fn(&Message) -> MessageId { |message| { // default message id is: source + sequence number // NOTE: If either the peer_id or source is not provided, we set to 0; diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index 659400ba53c..3524fc85854 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -18,15 +18,14 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::config::{GossipsubVersion, ValidationMode}; -use crate::error::{GossipsubHandlerError, ValidationError}; +use crate::config::{ValidationMode, Version}; +use crate::error::{HandlerError, ValidationError}; use crate::handler::HandlerEvent; use crate::topic::TopicHash; use crate::types::{ - GossipsubControlAction, GossipsubRpc, GossipsubSubscription, GossipsubSubscriptionAction, - MessageId, PeerInfo, PeerKind, RawGossipsubMessage, + ControlAction, MessageId, PeerInfo, PeerKind, RawMessage, Rpc, Subscription, SubscriptionAction, }; -use crate::{rpc_proto, GossipsubConfig}; +use crate::{rpc_proto, Config}; use asynchronous_codec::{Decoder, Encoder, Framed}; use byteorder::{BigEndian, ByteOrder}; use bytes::BytesMut; @@ -36,7 +35,7 @@ use libp2p_core::{ identity::PublicKey, InboundUpgrade, OutboundUpgrade, PeerId, ProtocolName, UpgradeInfo, }; use log::{debug, warn}; -use prost::Message as ProtobufMessage; +use prost::Message as _; use std::pin::Pin; use unsigned_varint::codec; @@ -57,15 +56,15 @@ impl ProtocolConfig { /// Builds a new [`ProtocolConfig`]. /// /// Sets the maximum gossip transmission size. - pub fn new(gossipsub_config: &GossipsubConfig) -> ProtocolConfig { + pub fn new(gossipsub_config: &Config) -> ProtocolConfig { let protocol_ids = match gossipsub_config.custom_id_version() { Some(v) => match v { - GossipsubVersion::V1_0 => vec![ProtocolId::new( + Version::V1_0 => vec![ProtocolId::new( gossipsub_config.protocol_id(), PeerKind::Gossipsub, false, )], - GossipsubVersion::V1_1 => vec![ProtocolId::new( + Version::V1_1 => vec![ProtocolId::new( gossipsub_config.protocol_id(), PeerKind::Gossipsubv1_1, false, @@ -149,7 +148,7 @@ where TSocket: AsyncRead + AsyncWrite + Unpin + Send + 'static, { type Output = (Framed, PeerKind); - type Error = GossipsubHandlerError; + type Error = HandlerError; type Future = Pin> + Send>>; fn upgrade_inbound(self, socket: TSocket, protocol_id: Self::Info) -> Self::Future { @@ -170,7 +169,7 @@ where TSocket: AsyncWrite + AsyncRead + Unpin + Send + 'static, { type Output = (Framed, PeerKind); - type Error = GossipsubHandlerError; + type Error = HandlerError; type Future = Pin> + Send>>; fn upgrade_outbound(self, socket: TSocket, protocol_id: Self::Info) -> Self::Future { @@ -271,22 +270,18 @@ impl GossipsubCodec { impl Encoder for GossipsubCodec { type Item = rpc_proto::Rpc; - type Error = GossipsubHandlerError; + type Error = HandlerError; - fn encode( - &mut self, - item: Self::Item, - dst: &mut BytesMut, - ) -> Result<(), GossipsubHandlerError> { + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), HandlerError> { Ok(self.codec.encode(item, dst)?) } } impl Decoder for GossipsubCodec { type Item = HandlerEvent; - type Error = GossipsubHandlerError; + type Error = HandlerError; - fn decode(&mut self, src: &mut BytesMut) -> Result, GossipsubHandlerError> { + fn decode(&mut self, src: &mut BytesMut) -> Result, HandlerError> { let rpc = match self.codec.decode(src)? { Some(p) => p, None => return Ok(None), @@ -341,7 +336,7 @@ impl Decoder for GossipsubCodec { // If the initial validation logic failed, add the message to invalid messages and // continue processing the others. if let Some(validation_error) = invalid_kind.take() { - let message = RawGossipsubMessage { + let message = RawMessage { source: None, // don't bother inform the application data: message.data.unwrap_or_default(), sequence_number: None, // don't inform the application @@ -361,7 +356,7 @@ impl Decoder for GossipsubCodec { // Build the invalid message (ignoring further validation of sequence number // and source) - let message = RawGossipsubMessage { + let message = RawMessage { source: None, // don't bother inform the application data: message.data.unwrap_or_default(), sequence_number: None, // don't inform the application @@ -386,7 +381,7 @@ impl Decoder for GossipsubCodec { seq_no, seq_no.len() ); - let message = RawGossipsubMessage { + let message = RawMessage { source: None, // don't bother inform the application data: message.data.unwrap_or_default(), sequence_number: None, // don't inform the application @@ -405,7 +400,7 @@ impl Decoder for GossipsubCodec { } else { // sequence number was not present debug!("Sequence number not present but expected"); - let message = RawGossipsubMessage { + let message = RawMessage { source: None, // don't bother inform the application data: message.data.unwrap_or_default(), sequence_number: None, // don't inform the application @@ -431,7 +426,7 @@ impl Decoder for GossipsubCodec { Err(_) => { // invalid peer id, add to invalid messages debug!("Message source has an invalid PeerId"); - let message = RawGossipsubMessage { + let message = RawMessage { source: None, // don't bother inform the application data: message.data.unwrap_or_default(), sequence_number, @@ -455,7 +450,7 @@ impl Decoder for GossipsubCodec { }; // This message has passed all validation, add it to the validated messages. - messages.push(RawGossipsubMessage { + messages.push(RawMessage { source, data: message.data.unwrap_or_default(), sequence_number, @@ -470,10 +465,10 @@ impl Decoder for GossipsubCodec { if let Some(rpc_control) = rpc.control { // Collect the gossipsub control messages - let ihave_msgs: Vec = rpc_control + let ihave_msgs: Vec = rpc_control .ihave .into_iter() - .map(|ihave| GossipsubControlAction::IHave { + .map(|ihave| ControlAction::IHave { topic_hash: TopicHash::from_raw(ihave.topic_id.unwrap_or_default()), message_ids: ihave .message_ids @@ -483,10 +478,10 @@ impl Decoder for GossipsubCodec { }) .collect(); - let iwant_msgs: Vec = rpc_control + let iwant_msgs: Vec = rpc_control .iwant .into_iter() - .map(|iwant| GossipsubControlAction::IWant { + .map(|iwant| ControlAction::IWant { message_ids: iwant .message_ids .into_iter() @@ -495,10 +490,10 @@ impl Decoder for GossipsubCodec { }) .collect(); - let graft_msgs: Vec = rpc_control + let graft_msgs: Vec = rpc_control .graft .into_iter() - .map(|graft| GossipsubControlAction::Graft { + .map(|graft| ControlAction::Graft { topic_hash: TopicHash::from_raw(graft.topic_id.unwrap_or_default()), }) .collect(); @@ -523,7 +518,7 @@ impl Decoder for GossipsubCodec { .collect::>(); let topic_hash = TopicHash::from_raw(prune.topic_id.unwrap_or_default()); - prune_msgs.push(GossipsubControlAction::Prune { + prune_msgs.push(ControlAction::Prune { topic_hash, peers, backoff: prune.backoff, @@ -537,16 +532,16 @@ impl Decoder for GossipsubCodec { } Ok(Some(HandlerEvent::Message { - rpc: GossipsubRpc { + rpc: Rpc { messages, subscriptions: rpc .subscriptions .into_iter() - .map(|sub| GossipsubSubscription { + .map(|sub| Subscription { action: if Some(true) == sub.subscribe { - GossipsubSubscriptionAction::Subscribe + SubscriptionAction::Subscribe } else { - GossipsubSubscriptionAction::Unsubscribe + SubscriptionAction::Unsubscribe }, topic_hash: TopicHash::from_raw(sub.topic_id.unwrap_or_default()), }) @@ -561,23 +556,23 @@ impl Decoder for GossipsubCodec { #[cfg(test)] mod tests { use super::*; - use crate::config::GossipsubConfig; - use crate::Gossipsub; + use crate::config::Config; + use crate::Behaviour; use crate::IdentTopic as Topic; use libp2p_core::identity::Keypair; use quickcheck::*; #[derive(Clone, Debug)] - struct Message(RawGossipsubMessage); + struct Message(RawMessage); impl Arbitrary for Message { fn arbitrary(g: &mut Gen) -> Self { let keypair = TestKeypair::arbitrary(g); // generate an arbitrary GossipsubMessage using the behaviour signing functionality - let config = GossipsubConfig::default(); - let gs: Gossipsub = - Gossipsub::new(crate::MessageAuthenticity::Signed(keypair.0), config).unwrap(); + let config = Config::default(); + let gs: Behaviour = + Behaviour::new(crate::MessageAuthenticity::Signed(keypair.0), config).unwrap(); let data = (0..g.gen_range(10..10024u32)) .map(|_| u8::arbitrary(g)) .collect::>(); @@ -636,7 +631,7 @@ mod tests { fn prop(message: Message) { let message = message.0; - let rpc = GossipsubRpc { + let rpc = Rpc { messages: vec![message], subscriptions: vec![], control_msgs: vec![], diff --git a/protocols/gossipsub/src/subscription_filter.rs b/protocols/gossipsub/src/subscription_filter.rs index ec6cb6756ba..f6b72e09c4e 100644 --- a/protocols/gossipsub/src/subscription_filter.rs +++ b/protocols/gossipsub/src/subscription_filter.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::types::GossipsubSubscription; +use crate::types::Subscription; use crate::TopicHash; use log::debug; use std::collections::{BTreeSet, HashMap, HashSet}; @@ -32,10 +32,10 @@ pub trait TopicSubscriptionFilter { /// [`Self::filter_incoming_subscription_set`] on the filtered set. fn filter_incoming_subscriptions<'a>( &mut self, - subscriptions: &'a [GossipsubSubscription], + subscriptions: &'a [Subscription], currently_subscribed_topics: &BTreeSet, - ) -> Result, String> { - let mut filtered_subscriptions: HashMap = HashMap::new(); + ) -> Result, String> { + let mut filtered_subscriptions: HashMap = HashMap::new(); for subscription in subscriptions { use std::collections::hash_map::Entry::*; match filtered_subscriptions.entry(subscription.topic_hash.clone()) { @@ -59,9 +59,9 @@ pub trait TopicSubscriptionFilter { /// By default this filters the elements based on [`Self::allow_incoming_subscription`]. fn filter_incoming_subscription_set<'a>( &mut self, - mut subscriptions: HashSet<&'a GossipsubSubscription>, + mut subscriptions: HashSet<&'a Subscription>, _currently_subscribed_topics: &BTreeSet, - ) -> Result, String> { + ) -> Result, String> { subscriptions.retain(|s| { if self.allow_incoming_subscription(s) { true @@ -78,7 +78,7 @@ pub trait TopicSubscriptionFilter { /// whether to filter out a subscription or not. /// By default this uses can_subscribe to decide the same for incoming subscriptions as for /// outgoing ones. - fn allow_incoming_subscription(&mut self, subscription: &GossipsubSubscription) -> bool { + fn allow_incoming_subscription(&mut self, subscription: &Subscription) -> bool { self.can_subscribe(&subscription.topic_hash) } } @@ -119,9 +119,9 @@ impl TopicSubscriptionFilter for MaxCountSubscriptio fn filter_incoming_subscriptions<'a>( &mut self, - subscriptions: &'a [GossipsubSubscription], + subscriptions: &'a [Subscription], currently_subscribed_topics: &BTreeSet, - ) -> Result, String> { + ) -> Result, String> { if subscriptions.len() > self.max_subscriptions_per_request { return Err("too many subscriptions per request".into()); } @@ -129,7 +129,7 @@ impl TopicSubscriptionFilter for MaxCountSubscriptio .filter .filter_incoming_subscriptions(subscriptions, currently_subscribed_topics)?; - use crate::types::GossipsubSubscriptionAction::*; + use crate::types::SubscriptionAction::*; let mut unsubscribed = 0; let mut new_subscribed = 0; @@ -176,9 +176,9 @@ where fn filter_incoming_subscription_set<'a>( &mut self, - subscriptions: HashSet<&'a GossipsubSubscription>, + subscriptions: HashSet<&'a Subscription>, currently_subscribed_topics: &BTreeSet, - ) -> Result, String> { + ) -> Result, String> { let intermediate = self .filter1 .filter_incoming_subscription_set(subscriptions, currently_subscribed_topics)?; @@ -217,8 +217,8 @@ pub mod regex { #[cfg(test)] mod test { use super::*; - use crate::types::GossipsubSubscription; - use crate::types::GossipsubSubscriptionAction::*; + use crate::types::Subscription; + use crate::types::SubscriptionAction::*; #[test] fn test_regex_subscription_filter() { @@ -230,15 +230,15 @@ pub mod regex { let old = Default::default(); let subscriptions = vec![ - GossipsubSubscription { + Subscription { action: Subscribe, topic_hash: t1, }, - GossipsubSubscription { + Subscription { action: Subscribe, topic_hash: t2, }, - GossipsubSubscription { + Subscription { action: Subscribe, topic_hash: t3, }, @@ -255,7 +255,7 @@ pub mod regex { #[cfg(test)] mod test { use super::*; - use crate::types::GossipsubSubscriptionAction::*; + use crate::types::SubscriptionAction::*; use std::iter::FromIterator; #[test] @@ -267,23 +267,23 @@ mod test { let old = BTreeSet::from_iter(vec![t1.clone()].into_iter()); let subscriptions = vec![ - GossipsubSubscription { + Subscription { action: Unsubscribe, topic_hash: t1.clone(), }, - GossipsubSubscription { + Subscription { action: Unsubscribe, topic_hash: t2.clone(), }, - GossipsubSubscription { + Subscription { action: Subscribe, topic_hash: t2, }, - GossipsubSubscription { + Subscription { action: Subscribe, topic_hash: t1.clone(), }, - GossipsubSubscription { + Subscription { action: Unsubscribe, topic_hash: t1, }, @@ -304,11 +304,11 @@ mod test { let old = Default::default(); let subscriptions = vec![ - GossipsubSubscription { + Subscription { action: Subscribe, topic_hash: t1, }, - GossipsubSubscription { + Subscription { action: Subscribe, topic_hash: t2, }, @@ -333,15 +333,15 @@ mod test { let old = Default::default(); let subscriptions = vec![ - GossipsubSubscription { + Subscription { action: Subscribe, topic_hash: t1.clone(), }, - GossipsubSubscription { + Subscription { action: Unsubscribe, topic_hash: t1.clone(), }, - GossipsubSubscription { + Subscription { action: Subscribe, topic_hash: t1, }, @@ -366,11 +366,11 @@ mod test { let old = t[0..2].iter().cloned().collect(); let subscriptions = vec![ - GossipsubSubscription { + Subscription { action: Subscribe, topic_hash: t[2].clone(), }, - GossipsubSubscription { + Subscription { action: Subscribe, topic_hash: t[3].clone(), }, @@ -395,23 +395,23 @@ mod test { let old = t[0..2].iter().cloned().collect(); let subscriptions = vec![ - GossipsubSubscription { + Subscription { action: Subscribe, topic_hash: t[4].clone(), }, - GossipsubSubscription { + Subscription { action: Subscribe, topic_hash: t[2].clone(), }, - GossipsubSubscription { + Subscription { action: Subscribe, topic_hash: t[3].clone(), }, - GossipsubSubscription { + Subscription { action: Unsubscribe, topic_hash: t[0].clone(), }, - GossipsubSubscription { + Subscription { action: Unsubscribe, topic_hash: t[1].clone(), }, @@ -432,11 +432,11 @@ mod test { let old = Default::default(); let subscriptions = vec![ - GossipsubSubscription { + Subscription { action: Subscribe, topic_hash: t1, }, - GossipsubSubscription { + Subscription { action: Subscribe, topic_hash: t2, }, diff --git a/protocols/gossipsub/src/transform.rs b/protocols/gossipsub/src/transform.rs index ed11c78e5e0..6f57d9fc46b 100644 --- a/protocols/gossipsub/src/transform.rs +++ b/protocols/gossipsub/src/transform.rs @@ -25,25 +25,22 @@ //! algorithms that can be topic-specific. Once the raw data is transformed the message-id is then //! calculated, allowing for applications to employ message-id functions post compression. -use crate::{GossipsubMessage, RawGossipsubMessage, TopicHash}; +use crate::{Message, RawMessage, TopicHash}; -/// A general trait of transforming a [`RawGossipsubMessage`] into a [`GossipsubMessage`]. The -/// [`RawGossipsubMessage`] is obtained from the wire and the [`GossipsubMessage`] is used to +/// A general trait of transforming a [`RawMessage`] into a [`Message`]. The +/// [`RawMessage`] is obtained from the wire and the [`Message`] is used to /// calculate the [`crate::MessageId`] of the message and is what is sent to the application. /// /// The inbound/outbound transforms must be inverses. Applying the inbound transform and then the /// outbound transform MUST leave the underlying data un-modified. /// -/// By default, this is the identity transform for all fields in [`GossipsubMessage`]. +/// By default, this is the identity transform for all fields in [`Message`]. pub trait DataTransform { - /// Takes a [`RawGossipsubMessage`] received and converts it to a [`GossipsubMessage`]. - fn inbound_transform( - &self, - raw_message: RawGossipsubMessage, - ) -> Result; + /// Takes a [`RawMessage`] received and converts it to a [`Message`]. + fn inbound_transform(&self, raw_message: RawMessage) -> Result; /// Takes the data to be published (a topic and associated data) transforms the data. The - /// transformed data will then be used to create a [`crate::RawGossipsubMessage`] to be sent to peers. + /// transformed data will then be used to create a [`crate::RawMessage`] to be sent to peers. fn outbound_transform( &self, topic: &TopicHash, @@ -56,11 +53,8 @@ pub trait DataTransform { pub struct IdentityTransform; impl DataTransform for IdentityTransform { - fn inbound_transform( - &self, - raw_message: RawGossipsubMessage, - ) -> Result { - Ok(GossipsubMessage { + fn inbound_transform(&self, raw_message: RawMessage) -> Result { + Ok(Message { source: raw_message.source, data: raw_message.data, sequence_number: raw_message.sequence_number, diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index 1fc7f450e13..03e6fc05a1d 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -24,7 +24,7 @@ use crate::TopicHash; use libp2p_core::PeerId; use libp2p_swarm::ConnectionId; use prometheus_client::encoding::EncodeLabelValue; -use prost::Message; +use prost::Message as _; use std::fmt; use std::fmt::Debug; @@ -110,7 +110,7 @@ pub enum PeerKind { /// A message received by the gossipsub system and stored locally in caches.. #[derive(Clone, PartialEq, Eq, Hash, Debug)] -pub struct RawGossipsubMessage { +pub struct RawMessage { /// Id of the peer that published this message. pub source: Option, @@ -133,7 +133,7 @@ pub struct RawGossipsubMessage { pub validated: bool, } -impl RawGossipsubMessage { +impl RawMessage { /// Calculates the encoded length of this message (used for calculating metrics). pub fn raw_protobuf_len(&self) -> usize { let message = rpc_proto::Message { @@ -148,10 +148,10 @@ impl RawGossipsubMessage { } } -/// The message sent to the user after a [`RawGossipsubMessage`] has been transformed by a +/// The message sent to the user after a [`RawMessage`] has been transformed by a /// [`crate::DataTransform`]. #[derive(Clone, PartialEq, Eq, Hash)] -pub struct GossipsubMessage { +pub struct Message { /// Id of the peer that published this message. pub source: Option, @@ -165,9 +165,9 @@ pub struct GossipsubMessage { pub topic: TopicHash, } -impl fmt::Debug for GossipsubMessage { +impl fmt::Debug for Message { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("GossipsubMessage") + f.debug_struct("Message") .field( "data", &format_args!("{:<20}", &hex_fmt::HexFmt(&self.data)), @@ -181,16 +181,16 @@ impl fmt::Debug for GossipsubMessage { /// A subscription received by the gossipsub system. #[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct GossipsubSubscription { +pub struct Subscription { /// Action to perform. - pub action: GossipsubSubscriptionAction, + pub action: SubscriptionAction, /// The topic from which to subscribe or unsubscribe. pub topic_hash: TopicHash, } /// Action that a subscription wants to perform. #[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub enum GossipsubSubscriptionAction { +pub enum SubscriptionAction { /// The remote wants to subscribe to the given topic. Subscribe, /// The remote wants to unsubscribe from the given topic. @@ -207,7 +207,7 @@ pub struct PeerInfo { /// A Control message received by the gossipsub system. #[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub enum GossipsubControlAction { +pub enum ControlAction { /// Node broadcasts known messages per topic - IHave control message. IHave { /// The topic of the messages. @@ -238,16 +238,16 @@ pub enum GossipsubControlAction { /// An RPC received/sent. #[derive(Clone, PartialEq, Eq, Hash)] -pub struct GossipsubRpc { +pub struct Rpc { /// List of messages that were part of this RPC query. - pub messages: Vec, + pub messages: Vec, /// List of subscriptions. - pub subscriptions: Vec, + pub subscriptions: Vec, /// List of Gossipsub control messages. - pub control_msgs: Vec, + pub control_msgs: Vec, } -impl GossipsubRpc { +impl Rpc { /// Converts the GossipsubRPC into its protobuf format. // A convenience function to avoid explicitly specifying types. pub fn into_protobuf(self) -> rpc_proto::Rpc { @@ -255,9 +255,9 @@ impl GossipsubRpc { } } -impl From for rpc_proto::Rpc { +impl From for rpc_proto::Rpc { /// Converts the RPC into protobuf format. - fn from(rpc: GossipsubRpc) -> Self { + fn from(rpc: Rpc) -> Self { // Messages let mut publish = Vec::new(); @@ -279,7 +279,7 @@ impl From for rpc_proto::Rpc { .subscriptions .into_iter() .map(|sub| rpc_proto::rpc::SubOpts { - subscribe: Some(sub.action == GossipsubSubscriptionAction::Subscribe), + subscribe: Some(sub.action == SubscriptionAction::Subscribe), topic_id: Some(sub.topic_hash.into_string()), }) .collect::>(); @@ -297,7 +297,7 @@ impl From for rpc_proto::Rpc { for action in rpc.control_msgs { match action { // collect all ihave messages - GossipsubControlAction::IHave { + ControlAction::IHave { topic_hash, message_ids, } => { @@ -307,19 +307,19 @@ impl From for rpc_proto::Rpc { }; control.ihave.push(rpc_ihave); } - GossipsubControlAction::IWant { message_ids } => { + ControlAction::IWant { message_ids } => { let rpc_iwant = rpc_proto::ControlIWant { message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(), }; control.iwant.push(rpc_iwant); } - GossipsubControlAction::Graft { topic_hash } => { + ControlAction::Graft { topic_hash } => { let rpc_graft = rpc_proto::ControlGraft { topic_id: Some(topic_hash.into_string()), }; control.graft.push(rpc_graft); } - GossipsubControlAction::Prune { + ControlAction::Prune { topic_hash, peers, backoff, @@ -353,7 +353,7 @@ impl From for rpc_proto::Rpc { } } -impl fmt::Debug for GossipsubRpc { +impl fmt::Debug for Rpc { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let mut b = f.debug_struct("GossipsubRpc"); if !self.messages.is_empty() { diff --git a/protocols/gossipsub/tests/smoke.rs b/protocols/gossipsub/tests/smoke.rs index 40605d2aab8..c5a67ecbff3 100644 --- a/protocols/gossipsub/tests/smoke.rs +++ b/protocols/gossipsub/tests/smoke.rs @@ -32,20 +32,17 @@ use futures::StreamExt; use libp2p_core::{ identity, multiaddr::Protocol, transport::MemoryTransport, upgrade, Multiaddr, Transport, }; -use libp2p_gossipsub::{ - Gossipsub, GossipsubConfigBuilder, GossipsubEvent, IdentTopic as Topic, MessageAuthenticity, - ValidationMode, -}; +use libp2p_gossipsub as gossipsub; use libp2p_plaintext::PlainText2Config; use libp2p_swarm::{Swarm, SwarmEvent}; use libp2p_yamux as yamux; struct Graph { - pub nodes: Vec<(Multiaddr, Swarm)>, + pub nodes: Vec<(Multiaddr, Swarm)>, } impl Future for Graph { - type Output = (Multiaddr, GossipsubEvent); + type Output = (Multiaddr, gossipsub::Event); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { for (addr, node) in &mut self.nodes { @@ -77,7 +74,7 @@ impl Graph { .cycle() .take(num_nodes) .map(|_| build_node()) - .collect::)>>(); + .collect::)>>(); let mut connected_nodes = vec![not_connected_nodes.pop().unwrap()]; @@ -112,7 +109,7 @@ impl Graph { /// `true`. /// /// Returns [`true`] on success and [`false`] on timeout. - fn wait_for bool>(&mut self, mut f: F) -> bool { + fn wait_for bool>(&mut self, mut f: F) -> bool { let fut = futures::future::poll_fn(move |cx| match self.poll_unpin(cx) { Poll::Ready((_addr, ev)) if f(&ev) => Poll::Ready(()), _ => Poll::Pending, @@ -143,7 +140,7 @@ impl Graph { } } -fn build_node() -> (Multiaddr, Swarm) { +fn build_node() -> (Multiaddr, Swarm) { let key = identity::Keypair::generate_ed25519(); let public_key = key.public(); @@ -162,15 +159,16 @@ fn build_node() -> (Multiaddr, Swarm) { // reduce the default values of the heartbeat, so that all nodes will receive gossip in a // timely fashion. - let config = GossipsubConfigBuilder::default() + let config = gossipsub::ConfigBuilder::default() .heartbeat_initial_delay(Duration::from_millis(100)) .heartbeat_interval(Duration::from_millis(200)) .history_length(10) .history_gossip(10) - .validation_mode(ValidationMode::Permissive) + .validation_mode(gossipsub::ValidationMode::Permissive) .build() .unwrap(); - let behaviour = Gossipsub::new(MessageAuthenticity::Author(peer_id), config).unwrap(); + let behaviour = + gossipsub::Behaviour::new(gossipsub::MessageAuthenticity::Author(peer_id), config).unwrap(); let mut swarm = Swarm::without_executor(transport, behaviour, peer_id); let port = 1 + random::(); @@ -197,7 +195,7 @@ fn multi_hop_propagation() { let number_nodes = graph.nodes.len(); // Subscribe each node to the same topic. - let topic = Topic::new("test-net"); + let topic = gossipsub::IdentTopic::new("test-net"); for (_addr, node) in &mut graph.nodes { node.behaviour_mut().subscribe(&topic).unwrap(); } @@ -205,7 +203,7 @@ fn multi_hop_propagation() { // Wait for all nodes to be subscribed. let mut subscribed = 0; let all_subscribed = graph.wait_for(move |ev| { - if let GossipsubEvent::Subscribed { .. } = ev { + if let gossipsub::Event::Subscribed { .. } = ev { subscribed += 1; if subscribed == (number_nodes - 1) * 2 { return true; @@ -234,7 +232,7 @@ fn multi_hop_propagation() { // Wait for all nodes to receive the published message. let mut received_msgs = 0; let all_received = graph.wait_for(move |ev| { - if let GossipsubEvent::Message { .. } = ev { + if let gossipsub::Event::Message { .. } = ev { received_msgs += 1; if received_msgs == number_nodes - 1 { return true;