diff --git a/Cargo.toml b/Cargo.toml index b8813dfce3f..7e66f3876ea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ libp2p-mplex = { version = "0.14.0-alpha.1", path = "muxers/mplex" } libp2p-identify = { version = "0.14.0-alpha.1", path = "protocols/identify" } libp2p-kad = { version = "0.14.0-alpha.1", path = "protocols/kad" } libp2p-floodsub = { version = "0.14.0-alpha.1", path = "protocols/floodsub" } +libp2p-gossipsub = { version = "0.14.0-alpha.1", path = "./protocols/gossipsub" } libp2p-ping = { version = "0.14.0-alpha.1", path = "protocols/ping" } libp2p-plaintext = { version = "0.14.0-alpha.1", path = "protocols/plaintext" } libp2p-core = { version = "0.14.0-alpha.1", path = "core" } @@ -62,6 +63,7 @@ members = [ "muxers/mplex", "muxers/yamux", "protocols/floodsub", + "protocols/gossipsub", "protocols/identify", "protocols/kad", "protocols/noise", diff --git a/examples/gossipsub-chat.rs b/examples/gossipsub-chat.rs new file mode 100644 index 00000000000..4cbbb7c2673 --- /dev/null +++ b/examples/gossipsub-chat.rs @@ -0,0 +1,154 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! A basic chat application with logs demonstrating libp2p and the gossipsub protocol. +//! +//! Using two terminal windows, start two instances. Type a message in either terminal and hit return: the +//! message is sent and printed in the other terminal. Close with Ctrl-c. +//! +//! You can of course open more terminal windows and add more participants. +//! Dialing any of the other peers will propagate the new participant to all +//! chat members and everyone will receive all messages. +//! +//! In order to get the nodes to connect, take note of the listening address of the first +//! instance and start the second with this address as the first argument. In the first terminal +//! window, run: +//! +//! ```sh +//! cargo run --example chat +//! ``` +//! +//! It will print the PeerId and the listening address, e.g. `Listening on +//! "/ip4/0.0.0.0/tcp/24915"` +//! +//! In the second terminal window, start a new instance of the example with: +//! +//! ```sh +//! cargo run --example chat -- /ip4/127.0.0.1/tcp/24915 +//! ``` +//! +//! The two nodes should then connect. + +use async_std::{io, task}; +use env_logger::{Builder, Env}; +use futures::prelude::*; +use libp2p::gossipsub::protocol::MessageId; +use libp2p::gossipsub::{GossipsubEvent, GossipsubMessage, Topic}; +use libp2p::{ + gossipsub, identity, + PeerId, +}; +use std::collections::hash_map::DefaultHasher; +use std::hash::{Hash, Hasher}; +use std::time::Duration; +use std::{error::Error, task::{Context, Poll}}; + +fn main() -> Result<(), Box> { + Builder::from_env(Env::default().default_filter_or("info")).init(); + + // Create a random PeerId + let local_key = identity::Keypair::generate_ed25519(); + let local_peer_id = PeerId::from(local_key.public()); + println!("Local peer id: {:?}", local_peer_id); + + // Set up an encrypted TCP Transport over the Mplex and Yamux protocols + let transport = libp2p::build_development_transport(local_key)?; + + // Create a Gossipsub topic + let topic = Topic::new("test-net".into()); + + // Create a Swarm to manage peers and events + let mut swarm = { + // to set default parameters for gossipsub use: + // let gossipsub_config = gossipsub::GossipsubConfig::default(); + + // To content-address message, we can take the hash of message and use it as an ID. + let message_id_fn = |message: &GossipsubMessage| { + let mut s = DefaultHasher::new(); + message.data.hash(&mut s); + MessageId(s.finish().to_string()) + }; + + // set custom gossipsub + let gossipsub_config = gossipsub::GossipsubConfigBuilder::new() + .heartbeat_interval(Duration::from_secs(10)) + .message_id_fn(message_id_fn) // content-address messages. No two messages of the + //same content will be propagated. + .build(); + // build a gossipsub network behaviour + let mut gossipsub = gossipsub::Gossipsub::new(local_peer_id.clone(), gossipsub_config); + gossipsub.subscribe(topic.clone()); + libp2p::Swarm::new(transport, gossipsub, local_peer_id) + }; + + // Listen on all interfaces and whatever port the OS assigns + libp2p::Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap(); + + // Reach out to another node if specified + if let Some(to_dial) = std::env::args().nth(1) { + let dialing = to_dial.clone(); + match to_dial.parse() { + Ok(to_dial) => match libp2p::Swarm::dial_addr(&mut swarm, to_dial) { + Ok(_) => println!("Dialed {:?}", dialing), + Err(e) => println!("Dial {:?} failed: {:?}", dialing, e), + }, + Err(err) => println!("Failed to parse address to dial: {:?}", err), + } + } + + // Read full lines from stdin + let mut stdin = io::BufReader::new(io::stdin()).lines(); + + // Kick it off + let mut listening = false; + task::block_on(future::poll_fn(move |cx: &mut Context| { + loop { + match stdin.try_poll_next_unpin(cx)? { + Poll::Ready(Some(line)) => swarm.publish(&topic, line.as_bytes()), + Poll::Ready(None) => panic!("Stdin closed"), + Poll::Pending => break, + }; + } + + loop { + match swarm.poll_next_unpin(cx) { + Poll::Ready(Some(gossip_event)) => match gossip_event { + GossipsubEvent::Message(peer_id, id, message) => println!( + "Got message: {} with id: {} from peer: {:?}", + String::from_utf8_lossy(&message.data), + id, + peer_id + ), + _ => {} + }, + Poll::Ready(None) | Poll::Pending => break, + } + } + + if !listening { + for addr in libp2p::Swarm::listeners(&swarm) { + println!("Listening on {:?}", addr); + listening = true; + } + } + + Poll::Pending + })) +} diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml new file mode 100644 index 00000000000..fab3d9ee01e --- /dev/null +++ b/protocols/gossipsub/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "libp2p-gossipsub" +edition = "2018" +version = "0.14.0-alpha.1" +authors = ["Age Manning "] +license = "MIT" + +[dependencies] +libp2p-swarm = { version = "0.4.0-alpha.1", path = "../../swarm" } +libp2p-core = { version = "0.14.0-alpha.1", path = "../../core" } +bs58 = "0.3.0" +bytes = "0.5.4" +byteorder = "1.3.2" +fnv = "1.0.6" +futures = "0.3.1" +rand = "0.7.3" +futures_codec = "0.3.4" +wasm-timer = "0.2.4" +unsigned-varint = { version = "0.3.0", features = ["futures-codec"] } +log = "0.4.8" +sha2 = "0.8.1" +base64 = "0.11.0" +lru = "0.4.3" +smallvec = "1.1.0" +prost = "0.6.1" + +[dev-dependencies] +async-std = "1.4.0" +env_logger = "0.7.1" +libp2p-plaintext = { version = "0.14.0-alpha.1", path = "../plaintext" } +libp2p-yamux = { version = "0.14.0-alpha.1", path = "../../muxers/yamux" } +quickcheck = "0.9.2" + +[build-dependencies] +prost-build = "0.6" diff --git a/protocols/gossipsub/build.rs b/protocols/gossipsub/build.rs new file mode 100644 index 00000000000..3de5b750ca2 --- /dev/null +++ b/protocols/gossipsub/build.rs @@ -0,0 +1,24 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +fn main() { + prost_build::compile_protos(&["src/rpc.proto"], &["src"]).unwrap(); +} + diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs new file mode 100644 index 00000000000..981cd178338 --- /dev/null +++ b/protocols/gossipsub/src/behaviour.rs @@ -0,0 +1,1211 @@ +// Copyright 2020 Sigma Prime Pty Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::config::GossipsubConfig; +use crate::handler::GossipsubHandler; +use crate::mcache::MessageCache; +use crate::protocol::{ + GossipsubControlAction, GossipsubMessage, GossipsubSubscription, GossipsubSubscriptionAction, + MessageId, +}; +use crate::topic::{Topic, TopicHash}; +use futures::prelude::*; +use libp2p_core::{ConnectedPoint, Multiaddr, PeerId}; +use libp2p_swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler}; +use log::{debug, error, info, trace, warn}; +use lru::LruCache; +use rand; +use rand::{seq::SliceRandom, thread_rng}; +use std::{ + collections::hash_map::HashMap, + collections::HashSet, + collections::VecDeque, + iter, + marker::PhantomData, + sync::Arc, + task::{Context, Poll}, +}; +use wasm_timer::{Instant, Interval}; + +mod tests; + +/// Network behaviour that automatically identifies nodes periodically, and returns information +/// about them. +pub struct Gossipsub { + /// Configuration providing gossipsub performance parameters. + config: GossipsubConfig, + + /// Events that need to be yielded to the outside when polling. + events: VecDeque, GossipsubEvent>>, + + /// Pools non-urgent control messages between heartbeats. + control_pool: HashMap>, + + /// Peer id of the local node. Used for the source of the messages that we publish. + local_peer_id: PeerId, + + /// A map of all connected peers - A map of topic hash to a list of gossipsub peer Ids. + topic_peers: HashMap>, + + /// A map of all connected peers to their subscribed topics. + peer_topics: HashMap>, + + /// Overlay network of connected peers - Maps topics to connected gossipsub peers. + mesh: HashMap>, + + /// Map of topics to list of peers that we publish to, but don't subscribe to. + fanout: HashMap>, + + /// The last publish time for fanout topics. + fanout_last_pub: HashMap, + + /// Message cache for the last few heartbeats. + mcache: MessageCache, + + // We keep track of the messages we received (in the format `string(source ID, seq_no)`) so that + // we don't dispatch the same message twice if we receive it twice on the network. + received: LruCache, + + /// Heartbeat interval stream. + heartbeat: Interval, + + /// Marker to pin the generics. + marker: PhantomData, +} + +impl Gossipsub { + /// Creates a `Gossipsub` struct given a set of parameters specified by `gs_config`. + pub fn new(local_peer_id: PeerId, gs_config: GossipsubConfig) -> Self { + let local_peer_id = if gs_config.no_source_id { + PeerId::from_bytes(crate::config::IDENTITY_SOURCE.to_vec()).expect("Valid peer id") + } else { + local_peer_id + }; + + Gossipsub { + config: gs_config.clone(), + events: VecDeque::new(), + control_pool: HashMap::new(), + local_peer_id, + topic_peers: HashMap::new(), + peer_topics: HashMap::new(), + mesh: HashMap::new(), + fanout: HashMap::new(), + fanout_last_pub: HashMap::new(), + mcache: MessageCache::new( + gs_config.history_gossip, + gs_config.history_length, + gs_config.message_id_fn, + ), + received: LruCache::new(256), // keep track of the last 256 messages + heartbeat: Interval::new_at( + Instant::now() + gs_config.heartbeat_initial_delay, + gs_config.heartbeat_interval, + ), + marker: PhantomData, + } + } + + /// Subscribe to a topic. + /// + /// Returns true if the subscription worked. Returns false if we were already subscribed. + pub fn subscribe(&mut self, topic: Topic) -> bool { + debug!("Subscribing to topic: {}", topic); + let topic_hash = self.topic_hash(topic.clone()); + if self.mesh.get(&topic_hash).is_some() { + debug!("Topic: {} is already in the mesh.", topic); + return false; + } + + // send subscription request to all peers in the topic + if let Some(peer_list) = self.topic_peers.get(&topic_hash) { + let mut fixed_event = None; // initialise the event once if needed + if fixed_event.is_none() { + fixed_event = Some(Arc::new(GossipsubRpc { + messages: Vec::new(), + subscriptions: vec![GossipsubSubscription { + topic_hash: topic_hash.clone(), + action: GossipsubSubscriptionAction::Subscribe, + }], + control_msgs: Vec::new(), + })); + } + + let event = fixed_event.expect("event has been initialised"); + + for peer in peer_list { + debug!("Sending SUBSCRIBE to peer: {:?}", peer); + self.events.push_back(NetworkBehaviourAction::SendEvent { + peer_id: peer.clone(), + event: event.clone(), + }); + } + } + + // call JOIN(topic) + // this will add new peers to the mesh for the topic + self.join(&topic_hash); + info!("Subscribed to topic: {}", topic); + true + } + + /// Unsubscribes from a topic. + /// + /// Returns true if we were subscribed to this topic. + pub fn unsubscribe(&mut self, topic: Topic) -> bool { + debug!("Unsubscribing from topic: {}", topic); + let topic_hash = &self.topic_hash(topic); + + if self.mesh.get(topic_hash).is_none() { + debug!("Already unsubscribed from topic: {:?}", topic_hash); + // we are not subscribed + return false; + } + + // announce to all peers in the topic + let mut fixed_event = None; // initialise the event once if needed + if let Some(peer_list) = self.topic_peers.get(topic_hash) { + if fixed_event.is_none() { + fixed_event = Some(Arc::new(GossipsubRpc { + messages: Vec::new(), + subscriptions: vec![GossipsubSubscription { + topic_hash: topic_hash.clone(), + action: GossipsubSubscriptionAction::Unsubscribe, + }], + control_msgs: Vec::new(), + })); + } + + let event = fixed_event.expect("event has been initialised"); + + for peer in peer_list { + debug!("Sending UNSUBSCRIBE to peer: {:?}", peer); + self.events.push_back(NetworkBehaviourAction::SendEvent { + peer_id: peer.clone(), + event: event.clone(), + }); + } + } + + // call LEAVE(topic) + // this will remove the topic from the mesh + self.leave(&topic_hash); + + info!("Unsubscribed from topic: {:?}", topic_hash); + true + } + + /// Publishes a message to the network. + pub fn publish(&mut self, topic: &Topic, data: impl Into>) { + self.publish_many(iter::once(topic.clone()), data) + } + + /// Publishes a message with multiple topics to the network. + pub fn publish_many( + &mut self, + topic: impl IntoIterator, + data: impl Into>, + ) { + let message = GossipsubMessage { + source: self.local_peer_id.clone(), + data: data.into(), + // To be interoperable with the go-implementation this is treated as a 64-bit + // big-endian uint. + sequence_number: rand::random(), + topics: topic.into_iter().map(|t| self.topic_hash(t)).collect(), + }; + + debug!( + "Publishing message: {:?}", + (self.config.message_id_fn)(&message) + ); + + // forward the message to mesh peers + let local_peer_id = self.local_peer_id.clone(); + self.forward_msg(message.clone(), &local_peer_id); + + let mut recipient_peers = HashSet::new(); + for topic_hash in &message.topics { + // if not subscribed to the topic, use fanout peers + if self.mesh.get(&topic_hash).is_none() { + debug!("Topic: {:?} not in the mesh", topic_hash); + // build a list of peers to forward the message to + // if we have fanout peers add them to the map + if self.fanout.contains_key(&topic_hash) { + for peer in self.fanout.get(&topic_hash).expect("Topic must exist") { + recipient_peers.insert(peer.clone()); + } + } else { + // we have no fanout peers, select mesh_n of them and add them to the fanout + let mesh_n = self.config.mesh_n; + let new_peers = + Self::get_random_peers(&self.topic_peers, &topic_hash, mesh_n, { + |_| true + }); + // add the new peers to the fanout and recipient peers + self.fanout.insert(topic_hash.clone(), new_peers.clone()); + for peer in new_peers { + debug!("Peer added to fanout: {:?}", peer); + recipient_peers.insert(peer.clone()); + } + } + // we are publishing to fanout peers - update the time we published + self.fanout_last_pub + .insert(topic_hash.clone(), Instant::now()); + } + } + + // add published message to our received caches + let msg_id = (self.config.message_id_fn)(&message); + self.mcache.put(message.clone()); + self.received.put(msg_id.clone(), ()); + + info!("Published message: {:?}", msg_id); + + let event = Arc::new(GossipsubRpc { + subscriptions: Vec::new(), + messages: vec![message], + control_msgs: Vec::new(), + }); + // Send to peers we know are subscribed to the topic. + for peer_id in recipient_peers.iter() { + debug!("Sending message to peer: {:?}", peer_id); + self.events.push_back(NetworkBehaviourAction::SendEvent { + peer_id: peer_id.clone(), + event: event.clone(), + }); + } + } + + /// This function should be called when `config.manual_propagation` is `true` in order to + /// propagate messages. Messages are stored in the ['Memcache'] and validation is expected to be + /// fast enough that the messages should still exist in the cache. + /// + /// Calling this function will propagate a message stored in the cache, if it still exists. + /// If the message still exists in the cache, it will be forwarded and this function will return true, + /// otherwise it will return false. + pub fn propagate_message( + &mut self, + message_id: &MessageId, + propagation_source: &PeerId, + ) -> bool { + let message = match self.mcache.get(message_id) { + Some(message) => message.clone(), + None => { + warn!( + "Message not in cache. Ignoring forwarding. Message Id: {}", + message_id.0 + ); + return false; + } + }; + self.forward_msg(message, propagation_source); + true + } + + /// Gossipsub JOIN(topic) - adds topic peers to mesh and sends them GRAFT messages. + fn join(&mut self, topic_hash: &TopicHash) { + debug!("Running JOIN for topic: {:?}", topic_hash); + + // if we are already in the mesh, return + if self.mesh.contains_key(topic_hash) { + info!("JOIN: The topic is already in the mesh, ignoring JOIN"); + return; + } + + let mut added_peers = vec![]; + + // check if we have mesh_n peers in fanout[topic] and add them to the mesh if we do, + // removing the fanout entry. + if let Some((_, peers)) = self.fanout.remove_entry(topic_hash) { + debug!( + "JOIN: Removing peers from the fanout for topic: {:?}", + topic_hash + ); + // add up to mesh_n of them them to the mesh + // Note: These aren't randomly added, currently FIFO + let add_peers = std::cmp::min(peers.len(), self.config.mesh_n); + debug!( + "JOIN: Adding {:?} peers from the fanout for topic: {:?}", + add_peers, topic_hash + ); + added_peers.extend_from_slice(&peers[..add_peers]); + self.mesh + .insert(topic_hash.clone(), peers[..add_peers].to_vec()); + // remove the last published time + self.fanout_last_pub.remove(topic_hash); + } + + // check if we need to get more peers, which we randomly select + if added_peers.len() < self.config.mesh_n { + // get the peers + let new_peers = Self::get_random_peers( + &self.topic_peers, + topic_hash, + self.config.mesh_n - added_peers.len(), + { |_| true }, + ); + added_peers.extend_from_slice(&new_peers); + // add them to the mesh + debug!( + "JOIN: Inserting {:?} random peers into the mesh", + new_peers.len() + ); + let mesh_peers = self + .mesh + .entry(topic_hash.clone()) + .or_insert_with(|| Vec::new()); + mesh_peers.extend_from_slice(&new_peers); + } + + for peer_id in added_peers { + // Send a GRAFT control message + info!("JOIN: Sending Graft message to peer: {:?}", peer_id); + Self::control_pool_add( + &mut self.control_pool, + peer_id.clone(), + GossipsubControlAction::Graft { + topic_hash: topic_hash.clone(), + }, + ); + } + debug!("Completed JOIN for topic: {:?}", topic_hash); + } + + /// Gossipsub LEAVE(topic) - Notifies mesh[topic] peers with PRUNE messages. + fn leave(&mut self, topic_hash: &TopicHash) { + debug!("Running LEAVE for topic {:?}", topic_hash); + + // if our mesh contains the topic, send prune to peers and delete it from the mesh + if let Some((_, peers)) = self.mesh.remove_entry(topic_hash) { + for peer in peers { + // Send a PRUNE control message + info!("LEAVE: Sending PRUNE to peer: {:?}", peer); + Self::control_pool_add( + &mut self.control_pool, + peer.clone(), + GossipsubControlAction::Prune { + topic_hash: topic_hash.clone(), + }, + ); + } + } + debug!("Completed LEAVE for topic: {:?}", topic_hash); + } + + /// Handles an IHAVE control message. Checks our cache of messages. If the message is unknown, + /// requests it with an IWANT control message. + fn handle_ihave(&mut self, peer_id: &PeerId, ihave_msgs: Vec<(TopicHash, Vec)>) { + debug!("Handling IHAVE for peer: {:?}", peer_id); + // use a hashset to avoid duplicates efficiently + let mut iwant_ids = HashSet::new(); + + for (topic, ids) in ihave_msgs { + // only process the message if we are subscribed + if !self.mesh.contains_key(&topic) { + debug!( + "IHAVE: Ignoring IHAVE - Not subscribed to topic: {:?}", + topic + ); + continue; + } + + for id in ids { + if !self.received.contains(&id) { + // have not seen this message, request it + iwant_ids.insert(id); + } + } + } + + if !iwant_ids.is_empty() { + // Send the list of IWANT control messages + debug!("IHAVE: Sending IWANT message"); + Self::control_pool_add( + &mut self.control_pool, + peer_id.clone(), + GossipsubControlAction::IWant { + message_ids: iwant_ids.iter().cloned().collect(), + }, + ); + } + debug!("Completed IHAVE handling for peer: {:?}", peer_id); + } + + /// Handles an IWANT control message. Checks our cache of messages. If the message exists it is + /// forwarded to the requesting peer. + fn handle_iwant(&mut self, peer_id: &PeerId, iwant_msgs: Vec) { + debug!("Handling IWANT for peer: {:?}", peer_id); + // build a hashmap of available messages + let mut cached_messages = HashMap::new(); + + for id in iwant_msgs { + // if we have it, add it do the cached_messages mapping + if let Some(msg) = self.mcache.get(&id) { + cached_messages.insert(id.clone(), msg.clone()); + } + } + + if !cached_messages.is_empty() { + debug!("IWANT: Sending cached messages to peer: {:?}", peer_id); + // Send the messages to the peer + let message_list = cached_messages.into_iter().map(|entry| entry.1).collect(); + self.events.push_back(NetworkBehaviourAction::SendEvent { + peer_id: peer_id.clone(), + event: Arc::new(GossipsubRpc { + subscriptions: Vec::new(), + messages: message_list, + control_msgs: Vec::new(), + }), + }); + } + debug!("Completed IWANT handling for peer: {:?}", peer_id); + } + + /// Handles GRAFT control messages. If subscribed to the topic, adds the peer to mesh, if not, + /// responds with PRUNE messages. + fn handle_graft(&mut self, peer_id: &PeerId, topics: Vec) { + debug!("Handling GRAFT message for peer: {:?}", peer_id); + + let mut to_prune_topics = HashSet::new(); + for topic_hash in topics { + if let Some(peers) = self.mesh.get_mut(&topic_hash) { + // if we are subscribed, add peer to the mesh, if not already added + info!( + "GRAFT: Mesh link added for peer: {:?} in topic: {:?}", + peer_id, topic_hash + ); + // ensure peer is not already added + if !peers.contains(peer_id) { + peers.push(peer_id.clone()); + } + } else { + to_prune_topics.insert(topic_hash.clone()); + } + } + + if !to_prune_topics.is_empty() { + // build the prune messages to send + let prune_messages = to_prune_topics + .iter() + .map(|t| GossipsubControlAction::Prune { + topic_hash: t.clone(), + }) + .collect(); + // Send the prune messages to the peer + info!( + "GRAFT: Not subscribed to topics - Sending PRUNE to peer: {:?}", + peer_id + ); + self.events.push_back(NetworkBehaviourAction::SendEvent { + peer_id: peer_id.clone(), + event: Arc::new(GossipsubRpc { + subscriptions: Vec::new(), + messages: Vec::new(), + control_msgs: prune_messages, + }), + }); + } + debug!("Completed GRAFT handling for peer: {:?}", peer_id); + } + + /// Handles PRUNE control messages. Removes peer from the mesh. + fn handle_prune(&mut self, peer_id: &PeerId, topics: Vec) { + debug!("Handling PRUNE message for peer: {:?}", peer_id); + for topic_hash in topics { + if let Some(peers) = self.mesh.get_mut(&topic_hash) { + // remove the peer if it exists in the mesh + info!( + "PRUNE: Removing peer: {:?} from the mesh for topic: {:?}", + peer_id, topic_hash + ); + peers.retain(|p| p != peer_id); + } + } + debug!("Completed PRUNE handling for peer: {:?}", peer_id); + } + + /// Handles a newly received GossipsubMessage. + /// Forwards the message to all peers in the mesh. + fn handle_received_message(&mut self, msg: GossipsubMessage, propagation_source: &PeerId) { + let msg_id = (self.config.message_id_fn)(&msg); + debug!( + "Handling message: {:?} from peer: {:?}", + msg_id, propagation_source + ); + if self.received.put(msg_id.clone(), ()).is_some() { + debug!("Message already received, ignoring. Message: {:?}", msg_id); + return; + } + + // add to the memcache + self.mcache.put(msg.clone()); + + // dispatch the message to the user + if self.mesh.keys().any(|t| msg.topics.iter().any(|u| t == u)) { + debug!("Sending received message to user"); + self.events.push_back(NetworkBehaviourAction::GenerateEvent( + GossipsubEvent::Message(propagation_source.clone(), msg_id, msg.clone()), + )); + } + + // forward the message to mesh peers, if no validation is required + if !self.config.manual_propagation { + let message_id = (self.config.message_id_fn)(&msg); + self.forward_msg(msg, propagation_source); + debug!("Completed message handling for message: {:?}", message_id); + } + } + + /// Handles received subscriptions. + fn handle_received_subscriptions( + &mut self, + subscriptions: &[GossipsubSubscription], + propagation_source: &PeerId, + ) { + debug!( + "Handling subscriptions: {:?}, from source: {:?}", + subscriptions, propagation_source + ); + let subscribed_topics = match self.peer_topics.get_mut(&propagation_source) { + Some(topics) => topics, + None => { + error!("Subscription by unknown peer: {:?}", &propagation_source); + return; + } + }; + + for subscription in subscriptions { + // get the peers from the mapping, or insert empty lists if topic doesn't exist + let peer_list = self + .topic_peers + .entry(subscription.topic_hash.clone()) + .or_insert_with(Vec::new); + + match subscription.action { + GossipsubSubscriptionAction::Subscribe => { + if !peer_list.contains(&propagation_source) { + debug!( + "SUBSCRIPTION: topic_peer: Adding gossip peer: {:?} to topic: {:?}", + propagation_source, subscription.topic_hash + ); + peer_list.push(propagation_source.clone()); + } + + // add to the peer_topics mapping + if !subscribed_topics.contains(&subscription.topic_hash) { + info!( + "SUBSCRIPTION: Adding peer: {:?} to topic: {:?}", + propagation_source, subscription.topic_hash + ); + subscribed_topics.push(subscription.topic_hash.clone()); + } + + // if the mesh needs peers add the peer to the mesh + if let Some(peers) = self.mesh.get_mut(&subscription.topic_hash) { + if peers.len() < self.config.mesh_n_low { + debug!( + "SUBSCRIPTION: Adding peer {:?} to the mesh", + propagation_source, + ); + } + peers.push(propagation_source.clone()); + } + // generates a subscription event to be polled + self.events.push_back(NetworkBehaviourAction::GenerateEvent( + GossipsubEvent::Subscribed { + peer_id: propagation_source.clone(), + topic: subscription.topic_hash.clone(), + }, + )); + } + GossipsubSubscriptionAction::Unsubscribe => { + if let Some(pos) = peer_list.iter().position(|p| p == propagation_source) { + info!( + "SUBSCRIPTION: Removing gossip peer: {:?} from topic: {:?}", + propagation_source, subscription.topic_hash + ); + peer_list.remove(pos); + } + // remove topic from the peer_topics mapping + if let Some(pos) = subscribed_topics + .iter() + .position(|t| t == &subscription.topic_hash) + { + subscribed_topics.remove(pos); + } + // remove the peer from the mesh if it exists + if let Some(peers) = self.mesh.get_mut(&subscription.topic_hash) { + peers.retain(|peer| peer != propagation_source); + } + + // generate an unsubscribe event to be polled + self.events.push_back(NetworkBehaviourAction::GenerateEvent( + GossipsubEvent::Unsubscribed { + peer_id: propagation_source.clone(), + topic: subscription.topic_hash.clone(), + }, + )); + } + } + } + trace!( + "Completed handling subscriptions from source: {:?}", + propagation_source + ); + } + + /// Heartbeat function which shifts the memcache and updates the mesh. + fn heartbeat(&mut self) { + debug!("Starting heartbeat"); + + let mut to_graft = HashMap::new(); + let mut to_prune = HashMap::new(); + + // maintain the mesh for each topic + for (topic_hash, peers) in self.mesh.iter_mut() { + // too little peers - add some + if peers.len() < self.config.mesh_n_low { + debug!( + "HEARTBEAT: Mesh low. Topic: {:?} Contains: {:?} needs: {:?}", + topic_hash.clone().into_string(), + peers.len(), + self.config.mesh_n_low + ); + // not enough peers - get mesh_n - current_length more + let desired_peers = self.config.mesh_n - peers.len(); + let peer_list = + Self::get_random_peers(&self.topic_peers, topic_hash, desired_peers, { + |peer| !peers.contains(peer) + }); + for peer in &peer_list { + let current_topic = to_graft.entry(peer.clone()).or_insert_with(|| vec![]); + current_topic.push(topic_hash.clone()); + } + // update the mesh + debug!("Updating mesh, new mesh: {:?}", peer_list); + peers.extend(peer_list); + } + + // too many peers - remove some + if peers.len() > self.config.mesh_n_high { + debug!( + "HEARTBEAT: Mesh high. Topic: {:?} Contains: {:?} needs: {:?}", + topic_hash, + peers.len(), + self.config.mesh_n_high + ); + let excess_peer_no = peers.len() - self.config.mesh_n; + // shuffle the peers + let mut rng = thread_rng(); + peers.shuffle(&mut rng); + // remove the first excess_peer_no peers adding them to to_prune + for _ in 0..excess_peer_no { + let peer = peers + .pop() + .expect("There should always be enough peers to remove"); + let current_topic = to_prune.entry(peer).or_insert_with(|| vec![]); + current_topic.push(topic_hash.clone()); + } + } + } + + // remove expired fanout topics + { + let fanout = &mut self.fanout; // help the borrow checker + let fanout_ttl = self.config.fanout_ttl; + self.fanout_last_pub.retain(|topic_hash, last_pub_time| { + if *last_pub_time + fanout_ttl < Instant::now() { + debug!( + "HEARTBEAT: Fanout topic removed due to timeout. Topic: {:?}", + topic_hash + ); + fanout.remove(&topic_hash); + return false; + } + true + }); + } + + // maintain fanout + // check if our peers are still a part of the topic + for (topic_hash, peers) in self.fanout.iter_mut() { + let mut to_remove_peers = Vec::new(); + for peer in peers.iter() { + // is the peer still subscribed to the topic? + match self.peer_topics.get(peer) { + Some(topics) => { + if !topics.contains(&topic_hash) { + debug!( + "HEARTBEAT: Peer removed from fanout for topic: {:?}", + topic_hash + ); + to_remove_peers.push(peer.clone()); + } + } + None => { + // remove if the peer has disconnected + to_remove_peers.push(peer.clone()); + } + } + } + peers.retain(|peer| to_remove_peers.contains(&peer)); + + // not enough peers + if peers.len() < self.config.mesh_n { + debug!( + "HEARTBEAT: Fanout low. Contains: {:?} needs: {:?}", + peers.len(), + self.config.mesh_n + ); + let needed_peers = self.config.mesh_n - peers.len(); + let new_peers = + Self::get_random_peers(&self.topic_peers, topic_hash, needed_peers, |peer| { + !peers.contains(peer) + }); + peers.extend(new_peers); + } + } + + self.emit_gossip(); + + // send graft/prunes + if !to_graft.is_empty() | !to_prune.is_empty() { + self.send_graft_prune(to_graft, to_prune); + } + + // piggyback pooled control messages + self.flush_control_pool(); + + // shift the memcache + self.mcache.shift(); + debug!("Completed Heartbeat"); + } + + /// Emits gossip - Send IHAVE messages to a random set of gossip peers. This is applied to mesh + /// and fanout peers + fn emit_gossip(&mut self) { + debug!("Started gossip"); + for (topic_hash, peers) in self.mesh.iter().chain(self.fanout.iter()) { + let message_ids = self.mcache.get_gossip_ids(&topic_hash); + if message_ids.is_empty() { + return; + } + + // get gossip_lazy random peers + let to_msg_peers = Self::get_random_peers( + &self.topic_peers, + &topic_hash, + self.config.gossip_lazy, + |peer| !peers.contains(peer), + ); + for peer in to_msg_peers { + // send an IHAVE message + Self::control_pool_add( + &mut self.control_pool, + peer.clone(), + GossipsubControlAction::IHave { + topic_hash: topic_hash.clone(), + message_ids: message_ids.clone(), + }, + ); + } + } + debug!("Completed gossip"); + } + + /// Handles multiple GRAFT/PRUNE messages and coalesces them into chunked gossip control + /// messages. + fn send_graft_prune( + &mut self, + to_graft: HashMap>, + mut to_prune: HashMap>, + ) { + // handle the grafts and overlapping prunes + for (peer, topics) in to_graft.iter() { + let mut grafts: Vec = topics + .iter() + .map(|topic_hash| GossipsubControlAction::Graft { + topic_hash: topic_hash.clone(), + }) + .collect(); + let mut prunes: Vec = to_prune + .remove(&peer) + .unwrap_or_else(|| vec![]) + .iter() + .map(|topic_hash| GossipsubControlAction::Prune { + topic_hash: topic_hash.clone(), + }) + .collect(); + grafts.append(&mut prunes); + + // send the control messages + self.events.push_back(NetworkBehaviourAction::SendEvent { + peer_id: peer.clone(), + event: Arc::new(GossipsubRpc { + subscriptions: Vec::new(), + messages: Vec::new(), + control_msgs: grafts, + }), + }); + } + + // handle the remaining prunes + for (peer, topics) in to_prune.iter() { + let remaining_prunes = topics + .iter() + .map(|topic_hash| GossipsubControlAction::Prune { + topic_hash: topic_hash.clone(), + }) + .collect(); + self.events.push_back(NetworkBehaviourAction::SendEvent { + peer_id: peer.clone(), + event: Arc::new(GossipsubRpc { + subscriptions: Vec::new(), + messages: Vec::new(), + control_msgs: remaining_prunes, + }), + }); + } + } + + /// Helper function which forwards a message to mesh[topic] peers. + fn forward_msg(&mut self, message: GossipsubMessage, source: &PeerId) { + let msg_id = (self.config.message_id_fn)(&message); + debug!("Forwarding message: {:?}", msg_id); + let mut recipient_peers = HashSet::new(); + + // add mesh peers + for topic in &message.topics { + // mesh + if let Some(mesh_peers) = self.mesh.get(&topic) { + for peer_id in mesh_peers { + if peer_id != source { + recipient_peers.insert(peer_id.clone()); + } + } + } + } + + // forward the message to peers + if !recipient_peers.is_empty() { + let event = Arc::new(GossipsubRpc { + subscriptions: Vec::new(), + messages: vec![message.clone()], + control_msgs: Vec::new(), + }); + + for peer in recipient_peers.iter() { + debug!("Sending message: {:?} to peer {:?}", msg_id, peer); + self.events.push_back(NetworkBehaviourAction::SendEvent { + peer_id: peer.clone(), + event: event.clone(), + }); + } + } + debug!("Completed forwarding message"); + } + + /// Helper function to get a set of `n` random gossipsub peers for a `topic_hash` + /// filtered by the function `f`. + fn get_random_peers( + topic_peers: &HashMap>, + topic_hash: &TopicHash, + n: usize, + mut f: impl FnMut(&PeerId) -> bool, + ) -> Vec { + let mut gossip_peers = match topic_peers.get(topic_hash) { + // if they exist, filter the peers by `f` + Some(peer_list) => peer_list.iter().cloned().filter(|p| f(p)).collect(), + None => Vec::new(), + }; + + // if we have less than needed, return them + if gossip_peers.len() <= n { + debug!("RANDOM PEERS: Got {:?} peers", gossip_peers.len()); + return gossip_peers.to_vec(); + } + + // we have more peers than needed, shuffle them and return n of them + let mut rng = thread_rng(); + gossip_peers.partial_shuffle(&mut rng, n); + + debug!("RANDOM PEERS: Got {:?} peers", n); + + gossip_peers[..n].to_vec() + } + + // adds a control action to control_pool + fn control_pool_add( + control_pool: &mut HashMap>, + peer: PeerId, + control: GossipsubControlAction, + ) { + control_pool + .entry(peer.clone()) + .or_insert_with(Vec::new) + .push(control); + } + + /// Produces a `TopicHash` for a topic given the gossipsub configuration. + fn topic_hash(&self, topic: Topic) -> TopicHash { + if self.config.hash_topics { + topic.sha256_hash() + } else { + topic.no_hash() + } + } + + /// Takes each control action mapping and turns it into a message + fn flush_control_pool(&mut self) { + for (peer, controls) in self.control_pool.drain() { + self.events.push_back(NetworkBehaviourAction::SendEvent { + peer_id: peer, + event: Arc::new(GossipsubRpc { + subscriptions: Vec::new(), + messages: Vec::new(), + control_msgs: controls, + }), + }); + } + } +} + +impl NetworkBehaviour for Gossipsub +where + TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + type ProtocolsHandler = GossipsubHandler; + type OutEvent = GossipsubEvent; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + GossipsubHandler::new( + self.config.protocol_id.clone(), + self.config.max_transmit_size, + ) + } + + fn addresses_of_peer(&mut self, _: &PeerId) -> Vec { + Vec::new() + } + + fn inject_connected(&mut self, id: PeerId, _: ConnectedPoint) { + info!("New peer connected: {:?}", id); + // We need to send our subscriptions to the newly-connected node. + let mut subscriptions = vec![]; + for topic_hash in self.mesh.keys() { + subscriptions.push(GossipsubSubscription { + topic_hash: topic_hash.clone(), + action: GossipsubSubscriptionAction::Subscribe, + }); + } + + if !subscriptions.is_empty() { + // send our subscriptions to the peer + self.events.push_back(NetworkBehaviourAction::SendEvent { + peer_id: id.clone(), + event: Arc::new(GossipsubRpc { + messages: Vec::new(), + subscriptions, + control_msgs: Vec::new(), + }), + }); + } + + // For the time being assume all gossipsub peers + self.peer_topics.insert(id.clone(), Vec::new()); + } + + fn inject_disconnected(&mut self, id: &PeerId, _: ConnectedPoint) { + // remove from mesh, topic_peers, peer_topic and fanout + debug!("Peer disconnected: {:?}", id); + { + let topics = match self.peer_topics.get(&id) { + Some(topics) => (topics), + None => { + warn!("Disconnected node, not in connected nodes"); + return; + } + }; + + // remove peer from all mappings + for topic in topics { + // check the mesh for the topic + if let Some(mesh_peers) = self.mesh.get_mut(&topic) { + // check if the peer is in the mesh and remove it + if let Some(pos) = mesh_peers.iter().position(|p| p == id) { + mesh_peers.remove(pos); + } + } + + // remove from topic_peers + if let Some(peer_list) = self.topic_peers.get_mut(&topic) { + if let Some(pos) = peer_list.iter().position(|p| p == id) { + peer_list.remove(pos); + } + // debugging purposes + else { + warn!("Disconnected node: {:?} not in topic_peers peer list", &id); + } + } else { + warn!( + "Disconnected node: {:?} with topic: {:?} not in topic_peers", + &id, &topic + ); + } + + // remove from fanout + self.fanout + .get_mut(&topic) + .map(|peers| peers.retain(|p| p != id)); + } + } + + // remove peer from peer_topics + let was_in = self.peer_topics.remove(id); + debug_assert!(was_in.is_some()); + } + + fn inject_node_event(&mut self, propagation_source: PeerId, event: GossipsubRpc) { + // Handle subscriptions + // Update connected peers topics + self.handle_received_subscriptions(&event.subscriptions, &propagation_source); + + // Handle messages + for message in event.messages { + self.handle_received_message(message, &propagation_source); + } + + // Handle control messages + // group some control messages, this minimises SendEvents (code is simplified to handle each event at a time however) + let mut ihave_msgs = vec![]; + let mut graft_msgs = vec![]; + let mut prune_msgs = vec![]; + for control_msg in event.control_msgs { + match control_msg { + GossipsubControlAction::IHave { + topic_hash, + message_ids, + } => { + ihave_msgs.push((topic_hash, message_ids)); + } + GossipsubControlAction::IWant { message_ids } => { + self.handle_iwant(&propagation_source, message_ids) + } + GossipsubControlAction::Graft { topic_hash } => graft_msgs.push(topic_hash), + GossipsubControlAction::Prune { topic_hash } => prune_msgs.push(topic_hash), + } + } + if !ihave_msgs.is_empty() { + self.handle_ihave(&propagation_source, ihave_msgs); + } + if !graft_msgs.is_empty() { + self.handle_graft(&propagation_source, graft_msgs); + } + if !prune_msgs.is_empty() { + self.handle_prune(&propagation_source, prune_msgs); + } + } + + fn poll( + &mut self, + cx: &mut Context, + _: &mut impl PollParameters, + ) -> Poll< + NetworkBehaviourAction< + ::InEvent, + Self::OutEvent, + >, + > { + if let Some(event) = self.events.pop_front() { + // clone send event reference if others references are present + match event { + NetworkBehaviourAction::SendEvent { + peer_id, + event: send_event, + } => match Arc::try_unwrap(send_event) { + Ok(event) => { + return Poll::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }); + } + Err(event) => { + return Poll::Ready(NetworkBehaviourAction::SendEvent { + peer_id, + event: (*event).clone(), + }); + } + }, + NetworkBehaviourAction::GenerateEvent(e) => { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(e)); + } + NetworkBehaviourAction::DialAddress { address } => { + return Poll::Ready(NetworkBehaviourAction::DialAddress { address }); + } + NetworkBehaviourAction::DialPeer { peer_id } => { + return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id }); + } + NetworkBehaviourAction::ReportObservedAddr { address } => { + return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }); + } + } + } + + while let Poll::Ready(Some(())) = self.heartbeat.poll_next_unpin(cx) { + self.heartbeat(); + } + + Poll::Pending + } +} + +/// An RPC received/sent. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct GossipsubRpc { + /// List of messages that were part of this RPC query. + pub messages: Vec, + /// List of subscriptions. + pub subscriptions: Vec, + /// List of Gossipsub control messages. + pub control_msgs: Vec, +} + +/// Event that can happen on the gossipsub behaviour. +#[derive(Debug)] +pub enum GossipsubEvent { + /// A message has been received. This contains the PeerId that we received the message from, + /// the message id (used if the application layer needs to propagate the message) and the + /// message itself. + Message(PeerId, MessageId, GossipsubMessage), + + /// A remote subscribed to a topic. + Subscribed { + /// Remote that has subscribed. + peer_id: PeerId, + /// The topic it has subscribed to. + topic: TopicHash, + }, + + /// A remote unsubscribed from a topic. + Unsubscribed { + /// Remote that has unsubscribed. + peer_id: PeerId, + /// The topic it has subscribed from. + topic: TopicHash, + }, +} diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs new file mode 100644 index 00000000000..ac5ce5e1c57 --- /dev/null +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -0,0 +1,864 @@ +// Copyright 2020 Sigma Prime Pty Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + + +// collection of tests for the gossipsub network behaviour + +#[cfg(test)] +mod tests { + use super::super::*; + use async_std::net::TcpStream; + + // helper functions for testing + + // This function generates `peer_no` random PeerId's, subscribes to `topics` and subscribes the + // injected nodes to all topics if `to_subscribe` is set. All nodes are considered gossipsub nodes. + fn build_and_inject_nodes( + peer_no: usize, + topics: Vec, + to_subscribe: bool, + ) -> ( + Gossipsub, + Vec, + Vec, + ) { + // generate a default GossipsubConfig + let gs_config = GossipsubConfig::default(); + // create a gossipsub struct + let mut gs: Gossipsub = Gossipsub::new(PeerId::random(), gs_config); + + let mut topic_hashes = vec![]; + + // subscribe to the topics + for t in topics { + let topic = Topic::new(t); + gs.subscribe(topic.clone()); + topic_hashes.push(topic.no_hash().clone()); + } + + // build and connect peer_no random peers + let mut peers = vec![]; + let dummy_connected_point = ConnectedPoint::Dialer { + address: "/ip4/0.0.0.0/tcp/0".parse().unwrap(), + }; + + for _ in 0..peer_no { + let peer = PeerId::random(); + peers.push(peer.clone()); + as NetworkBehaviour>::inject_connected( + &mut gs, + peer.clone(), + dummy_connected_point.clone(), + ); + if to_subscribe { + gs.handle_received_subscriptions( + &topic_hashes + .iter() + .cloned() + .map(|t| GossipsubSubscription { + action: GossipsubSubscriptionAction::Subscribe, + topic_hash: t, + }) + .collect::>(), + &peer, + ); + }; + } + + return (gs, peers, topic_hashes); + } + + #[test] + /// Test local node subscribing to a topic + fn test_subscribe() { + // The node should: + // - Create an empty vector in mesh[topic] + // - Send subscription request to all peers + // - run JOIN(topic) + + let subscribe_topic = vec![String::from("test_subscribe")]; + let (gs, _, topic_hashes) = build_and_inject_nodes(20, subscribe_topic, true); + + assert!( + gs.mesh.get(&topic_hashes[0]).is_some(), + "Subscribe should add a new entry to the mesh[topic] hashmap" + ); + + // collect all the subscriptions + let subscriptions = + gs.events + .iter() + .fold(vec![], |mut collected_subscriptions, e| match e { + NetworkBehaviourAction::SendEvent { peer_id: _, event } => { + for s in &event.subscriptions { + match s.action { + GossipsubSubscriptionAction::Subscribe => { + collected_subscriptions.push(s.clone()) + } + _ => {} + }; + } + collected_subscriptions + } + _ => collected_subscriptions, + }); + + // we sent a subscribe to all known peers + assert!( + subscriptions.len() == 20, + "Should send a subscription to all known peers" + ); + } + + #[test] + /// Test unsubscribe. + fn test_unsubscribe() { + // Unsubscribe should: + // - Remove the mesh entry for topic + // - Send UNSUBSCRIBE to all known peers + // - Call Leave + + let topic_strings = vec![String::from("topic1"), String::from("topic2")]; + let topics = topic_strings + .iter() + .map(|t| Topic::new(t.clone())) + .collect::>(); + + // subscribe to topic_strings + let (mut gs, _, topic_hashes) = build_and_inject_nodes(20, topic_strings, true); + + for topic_hash in &topic_hashes { + assert!( + gs.topic_peers.get(&topic_hash).is_some(), + "Topic_peers contain a topic entry" + ); + assert!( + gs.mesh.get(&topic_hash).is_some(), + "mesh should contain a topic entry" + ); + } + + // unsubscribe from both topics + assert!( + gs.unsubscribe(topics[0].clone()), + "should be able to unsubscribe successfully from each topic", + ); + assert!( + gs.unsubscribe(topics[1].clone()), + "should be able to unsubscribe successfully from each topic", + ); + + let subscriptions = + gs.events + .iter() + .fold(vec![], |mut collected_subscriptions, e| match e { + NetworkBehaviourAction::SendEvent { peer_id: _, event } => { + for s in &event.subscriptions { + match s.action { + GossipsubSubscriptionAction::Unsubscribe => { + collected_subscriptions.push(s.clone()) + } + _ => {} + }; + } + collected_subscriptions + } + _ => collected_subscriptions, + }); + + // we sent a unsubscribe to all known peers, for two topics + assert!( + subscriptions.len() == 40, + "Should send an unsubscribe event to all known peers" + ); + + // check we clean up internal structures + for topic_hash in &topic_hashes { + assert!( + gs.mesh.get(&topic_hash).is_none(), + "All topics should have been removed from the mesh" + ); + } + } + + #[test] + /// Test JOIN(topic) functionality. + fn test_join() { + // The Join function should: + // - Remove peers from fanout[topic] + // - Add any fanout[topic] peers to the mesh (up to mesh_n) + // - Fill up to mesh_n peers from known gossipsub peers in the topic + // - Send GRAFT messages to all nodes added to the mesh + + // This test is not an isolated unit test, rather it uses higher level, + // subscribe/unsubscribe to perform the test. + + let topic_strings = vec![String::from("topic1"), String::from("topic2")]; + let topics = topic_strings + .iter() + .map(|t| Topic::new(t.clone())) + .collect::>(); + + let (mut gs, _, topic_hashes) = build_and_inject_nodes(20, topic_strings, true); + + // unsubscribe, then call join to invoke functionality + assert!( + gs.unsubscribe(topics[0].clone()), + "should be able to unsubscribe successfully" + ); + assert!( + gs.unsubscribe(topics[1].clone()), + "should be able to unsubscribe successfully" + ); + + // re-subscribe - there should be peers associated with the topic + assert!( + gs.subscribe(topics[0].clone()), + "should be able to subscribe successfully" + ); + + // should have added mesh_n nodes to the mesh + assert!( + gs.mesh.get(&topic_hashes[0]).unwrap().len() == 6, + "Should have added 6 nodes to the mesh" + ); + + // there should be mesh_n GRAFT messages. + let graft_messages = + gs.control_pool + .iter() + .fold(vec![], |mut collected_grafts, (_, controls)| { + for c in controls.iter() { + match c { + GossipsubControlAction::Graft { topic_hash: _ } => { + collected_grafts.push(c.clone()) + } + _ => {} + } + } + collected_grafts + }); + + assert_eq!( + graft_messages.len(), + 6, + "There should be 6 grafts messages sent to peers" + ); + + // verify fanout nodes + // add 3 random peers to the fanout[topic1] + gs.fanout.insert(topic_hashes[1].clone(), vec![]); + let new_peers = vec![]; + for _ in 0..3 { + let fanout_peers = gs.fanout.get_mut(&topic_hashes[1]).unwrap(); + fanout_peers.push(PeerId::random()); + } + + // subscribe to topic1 + gs.subscribe(topics[1].clone()); + + // the three new peers should have been added, along with 3 more from the pool. + assert!( + gs.mesh.get(&topic_hashes[1]).unwrap().len() == 6, + "Should have added 6 nodes to the mesh" + ); + let mesh_peers = gs.mesh.get(&topic_hashes[1]).unwrap(); + for new_peer in new_peers { + assert!( + mesh_peers.contains(new_peer), + "Fanout peer should be included in the mesh" + ); + } + + // there should now be 12 graft messages to be sent + let graft_messages = + gs.control_pool + .iter() + .fold(vec![], |mut collected_grafts, (_, controls)| { + for c in controls.iter() { + match c { + GossipsubControlAction::Graft { topic_hash: _ } => { + collected_grafts.push(c.clone()) + } + _ => {} + } + } + collected_grafts + }); + + assert!( + graft_messages.len() == 12, + "There should be 12 grafts messages sent to peers" + ); + } + + /// Test local node publish to subscribed topic + #[test] + fn test_publish() { + // node should: + // - Send publish message to all peers + // - Insert message into gs.mcache and gs.received + + let publish_topic = String::from("test_publish"); + let (mut gs, _, topic_hashes) = + build_and_inject_nodes(20, vec![publish_topic.clone()], true); + + assert!( + gs.mesh.get(&topic_hashes[0]).is_some(), + "Subscribe should add a new entry to the mesh[topic] hashmap" + ); + + // publish on topic + let publish_data = vec![0; 42]; + gs.publish(&Topic::new(publish_topic), publish_data); + + // Collect all publish messages + let publishes = gs + .events + .iter() + .fold(vec![], |mut collected_publish, e| match e { + NetworkBehaviourAction::SendEvent { peer_id: _, event } => { + for s in &event.messages { + collected_publish.push(s.clone()); + } + collected_publish + } + _ => collected_publish, + }); + + let msg_id = + (gs.config.message_id_fn)(&publishes.first().expect("Should contain > 0 entries")); + + assert!( + publishes.len() == 20, + "Should send a publish message to all known peers" + ); + + assert!( + gs.mcache.get(&msg_id).is_some(), + "Message cache should contain published message" + ); + assert!( + gs.received.get(&msg_id).is_some(), + "Received cache should contain published message" + ); + } + + /// Test local node publish to unsubscribed topic + #[test] + fn test_fanout() { + // node should: + // - Populate fanout peers + // - Send publish message to fanout peers + // - Insert message into gs.mcache and gs.received + let fanout_topic = String::from("test_fanout"); + let (mut gs, _, topic_hashes) = + build_and_inject_nodes(20, vec![fanout_topic.clone()], true); + + assert!( + gs.mesh.get(&topic_hashes[0]).is_some(), + "Subscribe should add a new entry to the mesh[topic] hashmap" + ); + // Unsubscribe from topic + assert!( + gs.unsubscribe(Topic::new(fanout_topic.clone())), + "should be able to unsubscribe successfully from topic" + ); + + // Publish on unsubscribed topic + let publish_data = vec![0; 42]; + gs.publish(&Topic::new(fanout_topic.clone()), publish_data); + + assert_eq!( + gs.fanout + .get(&TopicHash::from_raw(fanout_topic.clone())) + .unwrap() + .len(), + gs.config.mesh_n, + "Fanout should contain `mesh_n` peers for fanout topic" + ); + + // Collect all publish messages + let publishes = gs + .events + .iter() + .fold(vec![], |mut collected_publish, e| match e { + NetworkBehaviourAction::SendEvent { peer_id: _, event } => { + for s in &event.messages { + collected_publish.push(s.clone()); + } + collected_publish + } + _ => collected_publish, + }); + + let msg_id = + (gs.config.message_id_fn)(&publishes.first().expect("Should contain > 0 entries")); + + assert_eq!( + publishes.len(), + gs.config.mesh_n, + "Should send a publish message to `mesh_n` fanout peers" + ); + + assert!( + gs.mcache.get(&msg_id).is_some(), + "Message cache should contain published message" + ); + assert!( + gs.received.get(&msg_id).is_some(), + "Received cache should contain published message" + ); + } + + #[test] + /// Test the gossipsub NetworkBehaviour peer connection logic. + fn test_inject_connected() { + let (gs, peers, topic_hashes) = build_and_inject_nodes( + 20, + vec![String::from("topic1"), String::from("topic2")], + true, + ); + + // check that our subscriptions are sent to each of the peers + // collect all the SendEvents + let send_events: Vec<&NetworkBehaviourAction, GossipsubEvent>> = gs + .events + .iter() + .filter(|e| match e { + NetworkBehaviourAction::SendEvent { + peer_id: _, + event: _, + } => true, + _ => false, + }) + .collect(); + + // check that there are two subscriptions sent to each peer + for sevent in send_events.clone() { + match sevent { + NetworkBehaviourAction::SendEvent { peer_id: _, event } => { + assert!( + event.subscriptions.len() == 2, + "There should be two subscriptions sent to each peer (1 for each topic)." + ); + } + _ => {} + }; + } + + // check that there are 20 send events created + assert!( + send_events.len() == 20, + "There should be a subscription event sent to each peer." + ); + + // should add the new peers to `peer_topics` with an empty vec as a gossipsub node + for peer in peers { + let known_topics = gs.peer_topics.get(&peer).unwrap(); + assert!( + known_topics == &topic_hashes, + "The topics for each node should all topics" + ); + } + } + + #[test] + /// Test subscription handling + fn test_handle_received_subscriptions() { + // For every subscription: + // SUBSCRIBE: - Add subscribed topic to peer_topics for peer. + // - Add peer to topics_peer. + // UNSUBSCRIBE - Remove topic from peer_topics for peer. + // - Remove peer from topic_peers. + + let topics = vec!["topic1", "topic2", "topic3", "topic4"] + .iter() + .map(|&t| String::from(t)) + .collect(); + let (mut gs, peers, topic_hashes) = build_and_inject_nodes(20, topics, false); + + // The first peer sends 3 subscriptions and 1 unsubscription + let mut subscriptions = topic_hashes[..3] + .iter() + .map(|topic_hash| GossipsubSubscription { + action: GossipsubSubscriptionAction::Subscribe, + topic_hash: topic_hash.clone(), + }) + .collect::>(); + + subscriptions.push(GossipsubSubscription { + action: GossipsubSubscriptionAction::Unsubscribe, + topic_hash: topic_hashes[topic_hashes.len() - 1].clone(), + }); + + let unknown_peer = PeerId::random(); + // process the subscriptions + // first and second peers send subscriptions + gs.handle_received_subscriptions(&subscriptions, &peers[0]); + gs.handle_received_subscriptions(&subscriptions, &peers[1]); + // unknown peer sends the same subscriptions + gs.handle_received_subscriptions(&subscriptions, &unknown_peer); + + // verify the result + + let peer_topics = gs.peer_topics.get(&peers[0]).unwrap().clone(); + assert!( + peer_topics == topic_hashes[..3].to_vec(), + "First peer should be subscribed to three topics" + ); + let peer_topics = gs.peer_topics.get(&peers[1]).unwrap().clone(); + assert!( + peer_topics == topic_hashes[..3].to_vec(), + "Second peer should be subscribed to three topics" + ); + + assert!( + gs.peer_topics.get(&unknown_peer).is_none(), + "Unknown peer should not have been added" + ); + + for topic_hash in topic_hashes[..3].iter() { + let topic_peers = gs.topic_peers.get(topic_hash).unwrap().clone(); + assert!( + topic_peers == peers[..2].to_vec(), + "Two peers should be added to the first three topics" + ); + } + + // Peer 0 unsubscribes from the first topic + + gs.handle_received_subscriptions( + &vec![GossipsubSubscription { + action: GossipsubSubscriptionAction::Unsubscribe, + topic_hash: topic_hashes[0].clone(), + }], + &peers[0], + ); + + let peer_topics = gs.peer_topics.get(&peers[0]).unwrap().clone(); + assert!( + peer_topics == topic_hashes[1..3].to_vec(), + "Peer should be subscribed to two topics" + ); + + let topic_peers = gs.topic_peers.get(&topic_hashes[0]).unwrap().clone(); // only gossipsub at the moment + assert!( + topic_peers == peers[1..2].to_vec(), + "Only the second peers should be in the first topic" + ); + } + + #[test] + /// Test Gossipsub.get_random_peers() function + fn test_get_random_peers() { + // generate a default GossipsubConfig + let gs_config = GossipsubConfig::default(); + // create a gossipsub struct + let mut gs: Gossipsub = Gossipsub::new(PeerId::random(), gs_config); + + // create a topic and fill it with some peers + let topic_hash = Topic::new("Test".into()).no_hash().clone(); + let mut peers = vec![]; + for _ in 0..20 { + peers.push(PeerId::random()) + } + + gs.topic_peers.insert(topic_hash.clone(), peers.clone()); + + let random_peers = + Gossipsub::::get_random_peers(&gs.topic_peers, &topic_hash, 5, { |_| true }); + assert!(random_peers.len() == 5, "Expected 5 peers to be returned"); + let random_peers = + Gossipsub::::get_random_peers(&gs.topic_peers, &topic_hash, 30, { |_| true }); + assert!(random_peers.len() == 20, "Expected 20 peers to be returned"); + assert!(random_peers == peers, "Expected no shuffling"); + let random_peers = + Gossipsub::::get_random_peers(&gs.topic_peers, &topic_hash, 20, { |_| true }); + assert!(random_peers.len() == 20, "Expected 20 peers to be returned"); + assert!(random_peers == peers, "Expected no shuffling"); + let random_peers = + Gossipsub::::get_random_peers(&gs.topic_peers, &topic_hash, 0, { |_| true }); + assert!(random_peers.len() == 0, "Expected 0 peers to be returned"); + // test the filter + let random_peers = + Gossipsub::::get_random_peers(&gs.topic_peers, &topic_hash, 5, { |_| false }); + assert!(random_peers.len() == 0, "Expected 0 peers to be returned"); + let random_peers = + Gossipsub::::get_random_peers(&gs.topic_peers, &topic_hash, 10, { + |peer| peers.contains(peer) + }); + assert!(random_peers.len() == 10, "Expected 10 peers to be returned"); + } + + /// Tests that the correct message is sent when a peer asks for a message in our cache. + #[test] + fn test_handle_iwant_msg_cached() { + let (mut gs, peers, _) = build_and_inject_nodes(20, Vec::new(), true); + + let id = gs.config.message_id_fn; + + let message = GossipsubMessage { + source: peers[11].clone(), + data: vec![1, 2, 3, 4], + sequence_number: 1u64, + topics: Vec::new(), + }; + let msg_id = id(&message); + gs.mcache.put(message.clone()); + + gs.handle_iwant(&peers[7], vec![msg_id.clone()]); + + // the messages we are sending + let sent_messages = gs + .events + .iter() + .fold(vec![], |mut collected_messages, e| match e { + NetworkBehaviourAction::SendEvent { peer_id: _, event } => { + for c in &event.messages { + collected_messages.push(c.clone()) + } + collected_messages + } + _ => collected_messages, + }); + + assert!( + sent_messages.iter().any(|msg| id(msg) == msg_id), + "Expected the cached message to be sent to an IWANT peer" + ); + } + + /// Tests that messages are sent correctly depending on the shifting of the message cache. + #[test] + fn test_handle_iwant_msg_cached_shifted() { + let (mut gs, peers, _) = build_and_inject_nodes(20, Vec::new(), true); + + let id = gs.config.message_id_fn; + // perform 10 memshifts and check that it leaves the cache + for shift in 1..10 { + let message = GossipsubMessage { + source: peers[11].clone(), + data: vec![1, 2, 3, 4], + sequence_number: shift, + topics: Vec::new(), + }; + let msg_id = id(&message); + gs.mcache.put(message.clone()); + for _ in 0..shift { + gs.mcache.shift(); + } + + gs.handle_iwant(&peers[7], vec![msg_id.clone()]); + + // is the message is being sent? + let message_exists = gs.events.iter().any(|e| match e { + NetworkBehaviourAction::SendEvent { peer_id: _, event } => { + event.messages.iter().any(|msg| id(msg) == msg_id) + } + _ => false, + }); + // default history_length is 5, expect no messages after shift > 5 + if shift < 5 { + assert!( + message_exists, + "Expected the cached message to be sent to an IWANT peer before 5 shifts" + ); + } else { + assert!( + !message_exists, + "Expected the cached message to not be sent to an IWANT peer after 5 shifts" + ); + } + } + } + + #[test] + // tests that an event is not created when a peers asks for a message not in our cache + fn test_handle_iwant_msg_not_cached() { + let (mut gs, peers, _) = build_and_inject_nodes(20, Vec::new(), true); + + let events_before = gs.events.len(); + gs.handle_iwant(&peers[7], vec![MessageId(String::from("unknown id"))]); + let events_after = gs.events.len(); + + assert_eq!( + events_before, events_after, + "Expected event count to stay the same" + ); + } + + #[test] + // tests that an event is created when a peer shares that it has a message we want + fn test_handle_ihave_subscribed_and_msg_not_cached() { + let (mut gs, peers, topic_hashes) = + build_and_inject_nodes(20, vec![String::from("topic1")], true); + + gs.handle_ihave( + &peers[7], + vec![( + topic_hashes[0].clone(), + vec![MessageId(String::from("unknown id"))], + )], + ); + + // check that we sent an IWANT request for `unknown id` + let iwant_exists = match gs.control_pool.get(&peers[7]) { + Some(controls) => controls.iter().any(|c| match c { + GossipsubControlAction::IWant { message_ids } => message_ids + .iter() + .any(|m| *m.0 == String::from("unknown id")), + _ => false, + }), + _ => false, + }; + + assert!( + iwant_exists, + "Expected to send an IWANT control message for unkown message id" + ); + } + + #[test] + // tests that an event is not created when a peer shares that it has a message that + // we already have + fn test_handle_ihave_subscribed_and_msg_cached() { + let (mut gs, peers, topic_hashes) = + build_and_inject_nodes(20, vec![String::from("topic1")], true); + + let msg_id = MessageId(String::from("known id")); + gs.received.put(msg_id.clone(), ()); + + let events_before = gs.events.len(); + gs.handle_ihave(&peers[7], vec![(topic_hashes[0].clone(), vec![msg_id])]); + let events_after = gs.events.len(); + + assert_eq!( + events_before, events_after, + "Expected event count to stay the same" + ) + } + + #[test] + // test that an event is not created when a peer shares that it has a message in + // a topic that we are not subscribed to + fn test_handle_ihave_not_subscribed() { + let (mut gs, peers, _) = build_and_inject_nodes(20, vec![], true); + + let events_before = gs.events.len(); + gs.handle_ihave( + &peers[7], + vec![( + TopicHash::from_raw(String::from("unsubscribed topic")), + vec![MessageId(String::from("irrelevant id"))], + )], + ); + let events_after = gs.events.len(); + + assert_eq!( + events_before, events_after, + "Expected event count to stay the same" + ) + } + + #[test] + // tests that a peer is added to our mesh when we are both subscribed + // to the same topic + fn test_handle_graft_is_subscribed() { + let (mut gs, peers, topic_hashes) = + build_and_inject_nodes(20, vec![String::from("topic1")], true); + + gs.handle_graft(&peers[7], topic_hashes.clone()); + + assert!( + gs.mesh.get(&topic_hashes[0]).unwrap().contains(&peers[7]), + "Expected peer to have been added to mesh" + ); + } + + #[test] + // tests that a peer is not added to our mesh when they are subscribed to + // a topic that we are not + fn test_handle_graft_is_not_subscribed() { + let (mut gs, peers, topic_hashes) = + build_and_inject_nodes(20, vec![String::from("topic1")], true); + + gs.handle_graft( + &peers[7], + vec![TopicHash::from_raw(String::from("unsubscribed topic"))], + ); + + assert!( + gs.mesh.get(&topic_hashes[0]).unwrap().contains(&peers[7]), + "Expected peer to have been added to mesh" + ); + } + + #[test] + // tests multiple topics in a single graft message + fn test_handle_graft_multiple_topics() { + let topics: Vec = vec!["topic1", "topic2", "topic3", "topic4"] + .iter() + .map(|&t| String::from(t)) + .collect(); + + let (mut gs, peers, topic_hashes) = build_and_inject_nodes(20, topics.clone(), true); + + let mut their_topics = topic_hashes.clone(); + // their_topics = [topic1, topic2, topic3] + // our_topics = [topic1, topic2, topic4] + their_topics.pop(); + gs.leave(&their_topics[2]); + + gs.handle_graft(&peers[7], their_topics.clone()); + + for i in 0..2 { + assert!( + gs.mesh.get(&topic_hashes[i]).unwrap().contains(&peers[7]), + "Expected peer to be in the mesh for the first 2 topics" + ); + } + + assert!( + gs.mesh.get(&topic_hashes[2]).is_none(), + "Expected the second topic to not be in the mesh" + ); + } + + #[test] + // tests that a peer is removed from our mesh + fn test_handle_prune_peer_in_mesh() { + let (mut gs, peers, topic_hashes) = + build_and_inject_nodes(20, vec![String::from("topic1")], true); + + // insert peer into our mesh for 'topic1' + gs.mesh.insert(topic_hashes[0].clone(), peers.clone()); + assert!( + gs.mesh.get(&topic_hashes[0]).unwrap().contains(&peers[7]), + "Expected peer to be in mesh" + ); + + gs.handle_prune(&peers[7], topic_hashes.clone()); + assert!( + !gs.mesh.get(&topic_hashes[0]).unwrap().contains(&peers[7]), + "Expected peer to be removed from mesh" + ); + } +} diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs new file mode 100644 index 00000000000..5a715848c39 --- /dev/null +++ b/protocols/gossipsub/src/config.rs @@ -0,0 +1,251 @@ +// Copyright 2020 Sigma Prime Pty Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::protocol::{GossipsubMessage, MessageId}; +use std::borrow::Cow; +use std::time::Duration; + +/// If the `no_source_id` flag is set, the IDENTITY_SOURCE value is used as the source of the +/// packet. +pub const IDENTITY_SOURCE: [u8; 3] = [0, 1, 0]; + +/// Configuration parameters that define the performance of the gossipsub network. +#[derive(Clone)] +pub struct GossipsubConfig { + /// The protocol id to negotiate this protocol (default is `/meshsub/1.0.0`). + pub protocol_id: Cow<'static, [u8]>, + + // Overlay network parameters. + /// Number of heartbeats to keep in the `memcache` (default is 5). + pub history_length: usize, + + /// Number of past heartbeats to gossip about (default is 3). + pub history_gossip: usize, + + /// Target number of peers for the mesh network (D in the spec, default is 6). + pub mesh_n: usize, + + /// Minimum number of peers in mesh network before adding more (D_lo in the spec, default is 4). + pub mesh_n_low: usize, + + /// Maximum number of peers in mesh network before removing some (D_high in the spec, default + /// is 12). + pub mesh_n_high: usize, + + /// Number of peers to emit gossip to during a heartbeat (D_lazy in the spec, default is 6). + pub gossip_lazy: usize, + + /// Initial delay in each heartbeat (default is 5 seconds). + pub heartbeat_initial_delay: Duration, + + /// Time between each heartbeat (default is 1 second). + pub heartbeat_interval: Duration, + + /// Time to live for fanout peers (default is 60 seconds). + pub fanout_ttl: Duration, + + /// The maximum byte size for each gossip (default is 2048 bytes). + pub max_transmit_size: usize, + + /// Flag determining if gossipsub topics are hashed or sent as plain strings (default is false). + pub hash_topics: bool, + + /// When set, all published messages will have a 0 source `PeerId` (default is false). + pub no_source_id: bool, + + /// When set to `true`, prevents automatic forwarding of all received messages. This setting + /// allows a user to validate the messages before propagating them to their peers. If set to + /// true, the user must manually call `propagate_message()` on the behaviour to forward message + /// once validated (default is false). + pub manual_propagation: bool, + + /// A user-defined function allowing the user to specify the message id of a gossipsub message. + /// The default value is to concatenate the source peer id with a sequence number. Setting this + /// parameter allows the user to address packets arbitrarily. One example is content based + /// addressing, where this function may be set to `hash(message)`. This would prevent messages + /// of the same content from being duplicated. + /// + /// The function takes a `GossipsubMessage` as input and outputs a String to be interpreted as + /// the message id. + pub message_id_fn: fn(&GossipsubMessage) -> MessageId, +} + +impl Default for GossipsubConfig { + fn default() -> GossipsubConfig { + GossipsubConfig { + protocol_id: Cow::Borrowed(b"/meshsub/1.0.0"), + history_length: 5, + history_gossip: 3, + mesh_n: 6, + mesh_n_low: 4, + mesh_n_high: 12, + gossip_lazy: 6, // default to mesh_n + heartbeat_initial_delay: Duration::from_secs(5), + heartbeat_interval: Duration::from_secs(1), + fanout_ttl: Duration::from_secs(60), + max_transmit_size: 2048, + hash_topics: false, // default compatibility with floodsub + no_source_id: false, + manual_propagation: false, + message_id_fn: |message| { + // default message id is: source + sequence number + let mut source_string = message.source.to_base58(); + source_string.push_str(&message.sequence_number.to_string()); + MessageId(source_string) + }, + } + } +} + +pub struct GossipsubConfigBuilder { + config: GossipsubConfig, +} + +impl Default for GossipsubConfigBuilder { + fn default() -> GossipsubConfigBuilder { + GossipsubConfigBuilder { + config: GossipsubConfig::default(), + } + } +} + +impl GossipsubConfigBuilder { + // set default values + pub fn new() -> GossipsubConfigBuilder { + GossipsubConfigBuilder::default() + } + + pub fn protocol_id(&mut self, protocol_id: impl Into>) -> &mut Self { + self.config.protocol_id = protocol_id.into(); + self + } + + pub fn history_length(&mut self, history_length: usize) -> &mut Self { + assert!( + history_length >= self.config.history_gossip, + "The history_length must be greater than or equal to the history_gossip length" + ); + self.config.history_length = history_length; + self + } + + pub fn history_gossip(&mut self, history_gossip: usize) -> &mut Self { + assert!( + self.config.history_length >= history_gossip, + "The history_length must be greater than or equal to the history_gossip length" + ); + self.config.history_gossip = history_gossip; + self + } + + pub fn mesh_n(&mut self, mesh_n: usize) -> &mut Self { + assert!( + self.config.mesh_n_low <= mesh_n && mesh_n <= self.config.mesh_n_high, + "The following equality doesn't hold mesh_n_low <= mesh_n <= mesh_n_high" + ); + self.config.mesh_n = mesh_n; + self + } + + pub fn mesh_n_low(&mut self, mesh_n_low: usize) -> &mut Self { + assert!( + mesh_n_low <= self.config.mesh_n && self.config.mesh_n <= self.config.mesh_n_high, + "The following equality doesn't hold mesh_n_low <= mesh_n <= mesh_n_high" + ); + self.config.mesh_n_low = mesh_n_low; + self + } + + pub fn mesh_n_high(&mut self, mesh_n_high: usize) -> &mut Self { + assert!( + self.config.mesh_n_low <= self.config.mesh_n && self.config.mesh_n <= mesh_n_high, + "The following equality doesn't hold mesh_n_low <= mesh_n <= mesh_n_high" + ); + self.config.mesh_n_high = mesh_n_high; + self + } + + pub fn gossip_lazy(&mut self, gossip_lazy: usize) -> &mut Self { + self.config.gossip_lazy = gossip_lazy; + self + } + + pub fn heartbeat_initial_delay(&mut self, heartbeat_initial_delay: Duration) -> &mut Self { + self.config.heartbeat_initial_delay = heartbeat_initial_delay; + self + } + pub fn heartbeat_interval(&mut self, heartbeat_interval: Duration) -> &mut Self { + self.config.heartbeat_interval = heartbeat_interval; + self + } + pub fn fanout_ttl(&mut self, fanout_ttl: Duration) -> &mut Self { + self.config.fanout_ttl = fanout_ttl; + self + } + pub fn max_transmit_size(&mut self, max_transmit_size: usize) -> &mut Self { + self.config.max_transmit_size = max_transmit_size; + self + } + + pub fn hash_topics(&mut self) -> &mut Self { + self.config.hash_topics = true; + self + } + + pub fn no_source_id(&mut self) -> &mut Self { + self.config.no_source_id = true; + self + } + + pub fn manual_propagation(&mut self) -> &mut Self { + self.config.manual_propagation = true; + self + } + + pub fn message_id_fn(&mut self, id_fn: fn(&GossipsubMessage) -> MessageId) -> &mut Self { + self.config.message_id_fn = id_fn; + self + } + + pub fn build(&self) -> GossipsubConfig { + self.config.clone() + } +} + +impl std::fmt::Debug for GossipsubConfig { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + let mut builder = f.debug_struct("GossipsubConfig"); + let _ = builder.field("protocol_id", &self.protocol_id); + let _ = builder.field("history_length", &self.history_length); + let _ = builder.field("history_gossip", &self.history_gossip); + let _ = builder.field("mesh_n", &self.mesh_n); + let _ = builder.field("mesh_n_low", &self.mesh_n_low); + let _ = builder.field("mesh_n_high", &self.mesh_n_high); + let _ = builder.field("gossip_lazy", &self.gossip_lazy); + let _ = builder.field("heartbeat_initial_delay", &self.heartbeat_initial_delay); + let _ = builder.field("heartbeat_interval", &self.heartbeat_interval); + let _ = builder.field("fanout_ttl", &self.fanout_ttl); + let _ = builder.field("max_transmit_size", &self.max_transmit_size); + let _ = builder.field("hash_topics", &self.hash_topics); + let _ = builder.field("no_source_id", &self.no_source_id); + let _ = builder.field("manual_propagation", &self.manual_propagation); + builder.finish() + } +} diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs new file mode 100644 index 00000000000..adafe2dc0c1 --- /dev/null +++ b/protocols/gossipsub/src/handler.rs @@ -0,0 +1,359 @@ +// Copyright 2020 Sigma Prime Pty Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::behaviour::GossipsubRpc; +use crate::protocol::{GossipsubCodec, ProtocolConfig}; +use futures::prelude::*; +use futures_codec::Framed; +use libp2p_core::upgrade::{InboundUpgrade, Negotiated, OutboundUpgrade}; +use libp2p_swarm::protocols_handler::{ + KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, +}; +use log::{debug, trace, warn}; +use smallvec::SmallVec; +use std::{ + borrow::Cow, + io, + pin::Pin, + task::{Context, Poll}, +}; + +/// Protocol Handler that manages a single long-lived substream with a peer. +pub struct GossipsubHandler +where + TSubstream: AsyncRead + AsyncWrite, +{ + /// Upgrade configuration for the gossipsub protocol. + listen_protocol: SubstreamProtocol, + + /// The single long-lived outbound substream. + outbound_substream: Option>, + + /// The single long-lived inbound substream. + inbound_substream: Option>, + + /// Queue of values that we want to send to the remote. + send_queue: SmallVec<[GossipsubRpc; 16]>, + + /// Flag determining whether to maintain the connection to the peer. + keep_alive: KeepAlive, +} + +/// State of the inbound substream, opened either by us or by the remote. +enum InboundSubstreamState +where + TSubstream: AsyncRead + AsyncWrite, +{ + /// Waiting for a message from the remote. The idle state for an inbound substream. + WaitingInput(Framed, GossipsubCodec>), + /// The substream is being closed. + Closing(Framed, GossipsubCodec>), + /// An error occurred during processing. + Poisoned, +} + +/// State of the outbound substream, opened either by us or by the remote. +enum OutboundSubstreamState +where + TSubstream: AsyncRead + AsyncWrite, +{ + /// Waiting for the user to send a message. The idle state for an outbound substream. + WaitingOutput(Framed, GossipsubCodec>), + /// Waiting to send a message to the remote. + PendingSend(Framed, GossipsubCodec>, GossipsubRpc), + /// Waiting to flush the substream so that the data arrives to the remote. + PendingFlush(Framed, GossipsubCodec>), + /// The substream is being closed. Used by either substream. + _Closing(Framed, GossipsubCodec>), + /// An error occurred during processing. + Poisoned, +} + +impl GossipsubHandler +where + TSubstream: AsyncRead + AsyncWrite, +{ + /// Builds a new `GossipsubHandler`. + pub fn new(protocol_id: impl Into>, max_transmit_size: usize) -> Self { + GossipsubHandler { + listen_protocol: SubstreamProtocol::new(ProtocolConfig::new( + protocol_id, + max_transmit_size, + )), + inbound_substream: None, + outbound_substream: None, + send_queue: SmallVec::new(), + keep_alive: KeepAlive::Yes, + } + } +} + +impl Default for GossipsubHandler +where + TSubstream: AsyncRead + AsyncWrite, +{ + fn default() -> Self { + GossipsubHandler { + listen_protocol: SubstreamProtocol::new(ProtocolConfig::default()), + inbound_substream: None, + outbound_substream: None, + send_queue: SmallVec::new(), + keep_alive: KeepAlive::Yes, + } + } +} + +impl ProtocolsHandler for GossipsubHandler +where + TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + type InEvent = GossipsubRpc; + type OutEvent = GossipsubRpc; + type Error = io::Error; + type Substream = TSubstream; + type InboundProtocol = ProtocolConfig; + type OutboundProtocol = ProtocolConfig; + type OutboundOpenInfo = GossipsubRpc; + + fn listen_protocol(&self) -> SubstreamProtocol { + self.listen_protocol.clone() + } + + fn inject_fully_negotiated_inbound( + &mut self, + substream: >>::Output, + ) { + // new inbound substream. Replace the current one, if it exists. + trace!("New inbound substream request"); + self.inbound_substream = Some(InboundSubstreamState::WaitingInput(substream)); + } + + fn inject_fully_negotiated_outbound( + &mut self, + substream: >>::Output, + message: Self::OutboundOpenInfo, + ) { + // Should never establish a new outbound substream if one already exists. + // If this happens, an outbound message is not sent. + if self.outbound_substream.is_some() { + warn!("Established an outbound substream with one already available"); + // Add the message back to the send queue + self.send_queue.push(message); + } else { + self.outbound_substream = Some(OutboundSubstreamState::PendingSend(substream, message)); + } + } + + fn inject_event(&mut self, message: GossipsubRpc) { + self.send_queue.push(message); + } + + fn inject_dial_upgrade_error( + &mut self, + _: Self::OutboundOpenInfo, + _: ProtocolsHandlerUpgrErr< + >::Error, + >, + ) { + // Ignore upgrade errors for now. + // If a peer doesn't support this protocol, this will just ignore them, but not disconnect + // them. + } + + fn connection_keep_alive(&self) -> KeepAlive { + self.keep_alive + } + + fn poll( + &mut self, + cx: &mut Context, + ) -> Poll< + ProtocolsHandlerEvent< + Self::OutboundProtocol, + Self::OutboundOpenInfo, + Self::OutEvent, + Self::Error, + >, + > { + // determine if we need to create the stream + if !self.send_queue.is_empty() && self.outbound_substream.is_none() { + let message = self.send_queue.remove(0); + self.send_queue.shrink_to_fit(); + return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol: self.listen_protocol.clone(), + info: message, + }); + } + + loop { + match std::mem::replace( + &mut self.inbound_substream, + Some(InboundSubstreamState::Poisoned), + ) { + // inbound idle state + Some(InboundSubstreamState::WaitingInput(mut substream)) => { + match substream.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(message))) => { + self.inbound_substream = + Some(InboundSubstreamState::WaitingInput(substream)); + return Poll::Ready(ProtocolsHandlerEvent::Custom(message)); + } + Poll::Ready(Some(Err(e))) => { + debug!("Inbound substream error while awaiting input: {:?}", e); + self.inbound_substream = + Some(InboundSubstreamState::Closing(substream)); + } + // peer closed the stream + Poll::Ready(None) => { + self.inbound_substream = + Some(InboundSubstreamState::Closing(substream)); + } + Poll::Pending => { + self.inbound_substream = + Some(InboundSubstreamState::WaitingInput(substream)); + break; + } + } + } + Some(InboundSubstreamState::Closing(mut substream)) => { + match Sink::poll_close(Pin::new(&mut substream), cx) { + Poll::Ready(Ok(())) => { + self.inbound_substream = None; + if self.outbound_substream.is_none() { + self.keep_alive = KeepAlive::No; + } + break; + } + Poll::Ready(Err(e)) => { + debug!("Inbound substream error while closing: {:?}", e); + return Poll::Ready(ProtocolsHandlerEvent::Close(io::Error::new( + io::ErrorKind::BrokenPipe, + "Failed to close stream", + ))); + } + Poll::Pending => { + self.inbound_substream = + Some(InboundSubstreamState::Closing(substream)); + break; + } + } + } + None => { + self.inbound_substream = None; + break; + } + Some(InboundSubstreamState::Poisoned) => { + panic!("Error occurred during inbound stream processing") + } + } + } + + loop { + match std::mem::replace( + &mut self.outbound_substream, + Some(OutboundSubstreamState::Poisoned), + ) { + // outbound idle state + Some(OutboundSubstreamState::WaitingOutput(substream)) => { + if !self.send_queue.is_empty() { + let message = self.send_queue.remove(0); + self.send_queue.shrink_to_fit(); + self.outbound_substream = + Some(OutboundSubstreamState::PendingSend(substream, message)); + } else { + self.outbound_substream = + Some(OutboundSubstreamState::WaitingOutput(substream)); + break; + } + } + Some(OutboundSubstreamState::PendingSend(mut substream, message)) => { + match Sink::poll_ready(Pin::new(&mut substream), cx) { + Poll::Ready(Ok(())) => { + match Sink::start_send(Pin::new(&mut substream), message) { + Ok(()) => { + self.outbound_substream = + Some(OutboundSubstreamState::PendingFlush(substream)) + } + Err(e) => { + return Poll::Ready(ProtocolsHandlerEvent::Close(e)); + } + } + } + Poll::Ready(Err(e)) => { + return Poll::Ready(ProtocolsHandlerEvent::Close(e)); + } + Poll::Pending => { + self.outbound_substream = + Some(OutboundSubstreamState::PendingSend(substream, message)); + break; + } + } + } + Some(OutboundSubstreamState::PendingFlush(mut substream)) => { + match Sink::poll_flush(Pin::new(&mut substream), cx) { + Poll::Ready(Ok(())) => { + self.outbound_substream = + Some(OutboundSubstreamState::WaitingOutput(substream)) + } + Poll::Ready(Err(e)) => return Poll::Ready(ProtocolsHandlerEvent::Close(e)), + Poll::Pending => { + self.outbound_substream = + Some(OutboundSubstreamState::PendingFlush(substream)); + break; + } + } + } + // Currently never used - manual shutdown may implement this in the future + Some(OutboundSubstreamState::_Closing(mut substream)) => { + match Sink::poll_close(Pin::new(&mut substream), cx) { + Poll::Ready(Ok(())) => { + self.outbound_substream = None; + if self.inbound_substream.is_none() { + self.keep_alive = KeepAlive::No; + } + break; + } + Poll::Ready(Err(e)) => { + debug!("Outbound substream error while closing: {:?}", e); + return Poll::Ready(ProtocolsHandlerEvent::Close(io::Error::new( + io::ErrorKind::BrokenPipe, + "Failed to close outbound substream", + ))); + } + Poll::Pending => { + self.outbound_substream = + Some(OutboundSubstreamState::_Closing(substream)); + break; + } + } + } + None => { + self.outbound_substream = None; + break; + } + Some(OutboundSubstreamState::Poisoned) => { + panic!("Error occurred during outbound stream processing") + } + } + } + + Poll::Pending + } +} diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs new file mode 100644 index 00000000000..e0efa955714 --- /dev/null +++ b/protocols/gossipsub/src/lib.rs @@ -0,0 +1,153 @@ +// Copyright 2020 Sigma Prime Pty Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Gossipsub is a P2P pubsub (publish/subscription) routing layer designed to extend upon +//! flooodsub and meshsub routing protocols. +//! +//! # Overview +//! +//! *Note: The gossipsub protocol specifications +//! (https://github.com/libp2p/specs/tree/master/pubsub/gossipsub) provide an outline for the +//! routing protocol. They should be consulted for further detail.* +//! +//! Gossipsub is a blend of meshsub for data and randomsub for mesh metadata. It provides bounded +//! degree and amplification factor with the meshsub construction and augments it using gossip +//! propagation of metadata with the randomsub technique. +//! +//! The router maintains an overlay mesh network of peers on which to efficiently send messages and +//! metadata. Peers use control messages to broadcast and request known messages and +//! subscribe/unsubscribe from topics in the mesh network. +//! +//! # Important Discrepancies +//! +//! This section outlines the current implementation's potential discrepancies from that of other +//! implementations, due to undefined elements in the current specification. +//! +//! - **Topics** - In gossipsub, topics configurable by the `hash_topics` configuration parameter. +//! Topics are of type `TopicHash`. The current go implementation uses raw utf-8 strings, and this +//! is default configuration in rust-libp2p. Topics can be hashed (SHA256 hashed then base64 +//! encoded) by setting the `hash_topics` configuration parameter to true. +//! +//! - **Sequence Numbers** - A message on the gossipsub network is identified by the source +//! `PeerId` and a nonce (sequence number) of the message. The sequence numbers in this +//! implementation are sent as raw bytes across the wire. They are 64-bit big-endian unsigned +//! integers. They are chosen at random in this implementation of gossipsub, but are sequential in +//! the current go implementation. +//! +//! # Using Gossipsub +//! +//! ## GossipsubConfig +//! +//! The [`GossipsubConfig`] struct specifies various network performance/tuning configuration +//! parameters. Specifically it specifies: +//! +//! [`GossipsubConfig`]: struct.GossipsubConfig.html +//! +//! - `protocol_id` - The protocol id that this implementation will accept connections on. +//! - `history_length` - The number of heartbeats which past messages are kept in cache (default: 5). +//! - `history_gossip` - The number of past heartbeats that the node will send gossip metadata +//! about (default: 3). +//! - `mesh_n` - The target number of peers store in the local mesh network. +//! (default: 6). +//! - `mesh_n_low` - The minimum number of peers in the local mesh network before. +//! trying to add more peers to the mesh from the connected peer pool (default: 4). +//! - `mesh_n_high` - The maximum number of peers in the local mesh network before removing peers to +//! reach `mesh_n` peers (default: 12). +//! - `gossip_lazy` - The number of peers that the local node will gossip to during a heartbeat (default: `mesh_n` = 6). +//! - `heartbeat_initial_delay - The initial time delay before starting the first heartbeat (default: 5 seconds). +//! - `heartbeat_interval` - The time between each heartbeat (default: 1 second). +//! - `fanout_ttl` - The fanout time to live time period. The timeout required before removing peers from the fanout +//! for a given topic (default: 1 minute). +//! - `max_transmit_size` - This sets the maximum transmission size for total gossipsub messages on the network. +//! - `hash_topics` - Whether to hash the topics using base64(SHA256(topic)) or to leave as plain utf-8 strings. +//! - `manual_propagation` - Whether gossipsub should immediately forward received messages on the +//! network. For applications requiring message validation, this should be set to false, then the +//! application should call `propagate_message(message_id, propagation_source)` once validated, to +//! propagate the message to peers. +//! +//! This struct implements the `Default` trait and can be initialised via +//! `GossipsubConfig::default()`. +//! +//! +//! ## Gossipsub +//! +//! The [`Gossipsub`] struct implements the `NetworkBehaviour` trait allowing it to act as the +//! routing behaviour in a `Swarm`. This struct requires an instance of `PeerId` and +//! [`GossipsubConfig`]. +//! +//! [`Gossipsub`]: struct.Gossipsub.html + +//! ## Example +//! +//! An example of initialising a gossipsub compatible swarm: +//! +//! ```ignore +//! #extern crate libp2p; +//! #extern crate futures; +//! #extern crate tokio; +//! #use libp2p::gossipsub::GossipsubEvent; +//! #use libp2p::{gossipsub, secio, +//! # tokio_codec::{FramedRead, LinesCodec}, +//! #}; +//! let local_key = secio::SecioKeyPair::ed25519_generated().unwrap(); +//! let local_pub_key = local_key.to_public_key(); +//! +//! // Set up an encrypted TCP Transport over the Mplex and Yamux protocols +//! let transport = libp2p::build_development_transport(local_key); +//! +//! // Create a Floodsub/Gossipsub topic +//! let topic = libp2p::floodsub::TopicBuilder::new("example").build(); +//! +//! // Create a Swarm to manage peers and events +//! let mut swarm = { +//! // set default parameters for gossipsub +//! let gossipsub_config = gossipsub::GossipsubConfig::default(); +//! // build a gossipsub network behaviour +//! let mut gossipsub = +//! gossipsub::Gossipsub::new(local_pub_key.clone().into_peer_id(), gossipsub_config); +//! gossipsub.subscribe(topic.clone()); +//! libp2p::Swarm::new( +//! transport, +//! gossipsub, +//! libp2p::core::topology::MemoryTopology::empty(local_pub_key), +//! ) +//! }; +//! +//! // Listen on all interfaces and whatever port the OS assigns +//! let addr = libp2p::Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap(); +//! println!("Listening on {:?}", addr); +//! ``` + +pub mod protocol; + +mod behaviour; +mod config; +mod handler; +mod mcache; +mod topic; + +mod rpc_proto { + include!(concat!(env!("OUT_DIR"), "/gossipsub.pb.rs")); +} + +pub use self::behaviour::{Gossipsub, GossipsubEvent, GossipsubRpc}; +pub use self::config::{GossipsubConfig, GossipsubConfigBuilder}; +pub use self::protocol::{GossipsubMessage, MessageId}; +pub use self::topic::{Topic, TopicHash}; diff --git a/protocols/gossipsub/src/mcache.rs b/protocols/gossipsub/src/mcache.rs new file mode 100644 index 00000000000..8e74308ca88 --- /dev/null +++ b/protocols/gossipsub/src/mcache.rs @@ -0,0 +1,314 @@ +// Copyright 2020 Sigma Prime Pty Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +extern crate fnv; + +use crate::protocol::{GossipsubMessage, MessageId}; +use crate::topic::TopicHash; +use std::collections::HashMap; + +/// CacheEntry stored in the history. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct CacheEntry { + mid: MessageId, + topics: Vec, +} + +/// MessageCache struct holding history of messages. +#[derive(Clone)] +pub struct MessageCache { + msgs: HashMap, + history: Vec>, + gossip: usize, + msg_id: fn(&GossipsubMessage) -> MessageId, +} + +/// Implementation of the MessageCache. +impl MessageCache { + pub fn new( + gossip: usize, + history_capacity: usize, + msg_id: fn(&GossipsubMessage) -> MessageId, + ) -> MessageCache { + MessageCache { + gossip, + msgs: HashMap::default(), + history: vec![Vec::new(); history_capacity], + msg_id, + } + } + + /// Creates a `MessageCache` with a default message id function. + #[allow(dead_code)] + pub fn new_default(gossip: usize, history_capacity: usize) -> MessageCache { + let default_id = |message: &GossipsubMessage| { + // default message id is: source + sequence number + let mut source_string = message.source.to_base58(); + source_string.push_str(&message.sequence_number.to_string()); + MessageId(source_string) + }; + MessageCache { + gossip, + msgs: HashMap::default(), + history: vec![Vec::new(); history_capacity], + msg_id: default_id, + } + } + + /// Put a message into the memory cache + pub fn put(&mut self, msg: GossipsubMessage) { + let message_id = (self.msg_id)(&msg); + let cache_entry = CacheEntry { + mid: message_id.clone(), + topics: msg.topics.clone(), + }; + + self.msgs.insert(message_id, msg); + + self.history[0].push(cache_entry); + } + + /// Get a message with `message_id` + pub fn get(&self, message_id: &MessageId) -> Option<&GossipsubMessage> { + self.msgs.get(message_id) + } + + /// Get a list of GossipIds for a given topic + pub fn get_gossip_ids(&self, topic: &TopicHash) -> Vec { + self.history[..self.gossip] + .iter() + .fold(vec![], |mut current_entries, entries| { + // search for entries with desired topic + let mut found_entries: Vec = entries + .iter() + .filter_map(|entry| { + if entry.topics.iter().any(|t| t == topic) { + Some(entry.mid.clone()) + } else { + None + } + }) + .collect(); + + // generate the list + current_entries.append(&mut found_entries); + current_entries + }) + } + + /// Shift the history array down one and delete messages associated with the + /// last entry + pub fn shift(&mut self) { + for entry in self.history.pop().expect("history is always > 1") { + self.msgs.remove(&entry.mid); + } + + // Insert an empty vec in position 0 + self.history.insert(0, Vec::new()); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{Topic, TopicHash}; + use libp2p_core::PeerId; + + fn gen_testm(x: u64, topics: Vec) -> GossipsubMessage { + let u8x: u8 = x as u8; + let source = PeerId::random(); + let data: Vec = vec![u8x]; + let sequence_number = x; + + let m = GossipsubMessage { + source, + data, + sequence_number, + topics, + }; + m + } + + #[test] + /// Test that the message cache can be created. + fn test_new_cache() { + let default_id = |message: &GossipsubMessage| { + // default message id is: source + sequence number + let mut source_string = message.source.to_base58(); + source_string.push_str(&message.sequence_number.to_string()); + MessageId(source_string) + }; + let x: usize = 3; + let mc = MessageCache::new(x, 5, default_id); + + assert_eq!(mc.gossip, x); + } + + #[test] + /// Test you can put one message and get one. + fn test_put_get_one() { + let mut mc = MessageCache::new_default(10, 15); + + let topic1_hash = Topic::new("topic1".into()).no_hash().clone(); + let topic2_hash = Topic::new("topic2".into()).no_hash().clone(); + + let m = gen_testm(10, vec![topic1_hash, topic2_hash]); + + mc.put(m.clone()); + + assert!(mc.history[0].len() == 1); + + let fetched = mc.get(&(mc.msg_id)(&m)); + + assert_eq!(fetched.is_none(), false); + assert_eq!(fetched.is_some(), true); + + // Make sure it is the same fetched message + match fetched { + Some(x) => assert_eq!(*x, m), + _ => assert!(false), + } + } + + #[test] + /// Test attempting to 'get' with a wrong id. + fn test_get_wrong() { + let mut mc = MessageCache::new_default(10, 15); + + let topic1_hash = Topic::new("topic1".into()).no_hash().clone(); + let topic2_hash = Topic::new("topic2".into()).no_hash().clone(); + + let m = gen_testm(10, vec![topic1_hash, topic2_hash]); + + mc.put(m.clone()); + + // Try to get an incorrect ID + let wrong_id = MessageId(String::from("wrongid")); + let fetched = mc.get(&wrong_id); + assert_eq!(fetched.is_none(), true); + } + + #[test] + /// Test attempting to 'get' empty message cache. + fn test_get_empty() { + let mc = MessageCache::new_default(10, 15); + + // Try to get an incorrect ID + let wrong_string = MessageId(String::from("imempty")); + let fetched = mc.get(&wrong_string); + assert_eq!(fetched.is_none(), true); + } + + #[test] + /// Test adding a message with no topics. + fn test_no_topic_put() { + let mut mc = MessageCache::new_default(3, 5); + + // Build the message + let m = gen_testm(1, vec![]); + mc.put(m.clone()); + + let fetched = mc.get(&(mc.msg_id)(&m)); + + // Make sure it is the same fetched message + match fetched { + Some(x) => assert_eq!(*x, m), + _ => assert!(false), + } + } + + #[test] + /// Test shift mechanism. + fn test_shift() { + let mut mc = MessageCache::new_default(1, 5); + + let topic1_hash = Topic::new("topic1".into()).no_hash().clone(); + let topic2_hash = Topic::new("topic2".into()).no_hash().clone(); + + // Build the message + for i in 0..10 { + let m = gen_testm(i, vec![topic1_hash.clone(), topic2_hash.clone()]); + mc.put(m.clone()); + } + + mc.shift(); + + // Ensure the shift occurred + assert!(mc.history[0].len() == 0); + assert!(mc.history[1].len() == 10); + + // Make sure no messages deleted + assert!(mc.msgs.len() == 10); + } + + #[test] + /// Test Shift with no additions. + fn test_empty_shift() { + let mut mc = MessageCache::new_default(1, 5); + + let topic1_hash = Topic::new("topic1".into()).no_hash().clone(); + let topic2_hash = Topic::new("topic2".into()).no_hash().clone(); + // Build the message + for i in 0..10 { + let m = gen_testm(i, vec![topic1_hash.clone(), topic2_hash.clone()]); + mc.put(m.clone()); + } + + mc.shift(); + + // Ensure the shift occurred + assert!(mc.history[0].len() == 0); + assert!(mc.history[1].len() == 10); + + mc.shift(); + + assert!(mc.history[2].len() == 10); + assert!(mc.history[1].len() == 0); + assert!(mc.history[0].len() == 0); + } + + #[test] + /// Test shift to see if the last history messages are removed. + fn test_remove_last_from_shift() { + let mut mc = MessageCache::new_default(4, 5); + + let topic1_hash = Topic::new("topic1".into()).no_hash().clone(); + let topic2_hash = Topic::new("topic2".into()).no_hash().clone(); + // Build the message + for i in 0..10 { + let m = gen_testm(i, vec![topic1_hash.clone(), topic2_hash.clone()]); + mc.put(m.clone()); + } + + // Shift right until deleting messages + mc.shift(); + mc.shift(); + mc.shift(); + mc.shift(); + + assert_eq!(mc.history[mc.history.len() - 1].len(), 10); + + // Shift and delete the messages + mc.shift(); + assert_eq!(mc.history[mc.history.len() - 1].len(), 0); + assert_eq!(mc.history[0].len(), 0); + assert_eq!(mc.msgs.len(), 0); + } +} diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs new file mode 100644 index 00000000000..14a8c6ddd39 --- /dev/null +++ b/protocols/gossipsub/src/protocol.rs @@ -0,0 +1,399 @@ +// Copyright 2020 Sigma Prime Pty Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::behaviour::GossipsubRpc; +use crate::rpc_proto; +use crate::topic::TopicHash; +use byteorder::{BigEndian, ByteOrder}; +use bytes::Bytes; +use bytes::BytesMut; +use futures::future; +use futures::prelude::*; +use futures_codec::{Decoder, Encoder, Framed}; +use libp2p_core::{InboundUpgrade, OutboundUpgrade, PeerId, UpgradeInfo}; +use prost::Message as ProtobufMessage; +use std::{borrow::Cow, io, iter, pin::Pin}; +use unsigned_varint::codec; + +/// Implementation of the `ConnectionUpgrade` for the Gossipsub protocol. +#[derive(Debug, Clone)] +pub struct ProtocolConfig { + protocol_id: Cow<'static, [u8]>, + max_transmit_size: usize, +} + +impl Default for ProtocolConfig { + fn default() -> Self { + Self { + protocol_id: Cow::Borrowed(b"/meshsub/1.0.0"), + max_transmit_size: 2048, + } + } +} + +impl ProtocolConfig { + /// Builds a new `ProtocolConfig`. + /// Sets the maximum gossip transmission size. + pub fn new( + protocol_id: impl Into>, + max_transmit_size: usize, + ) -> ProtocolConfig { + ProtocolConfig { + protocol_id: protocol_id.into(), + max_transmit_size, + } + } +} + +impl UpgradeInfo for ProtocolConfig { + type Info = Cow<'static, [u8]>; + type InfoIter = iter::Once; + + fn protocol_info(&self) -> Self::InfoIter { + iter::once(self.protocol_id.clone()) + } +} + +impl InboundUpgrade for ProtocolConfig +where + TSocket: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + type Output = Framed; + type Error = io::Error; + type Future = Pin> + Send>>; + + fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future { + let mut length_codec = codec::UviBytes::default(); + length_codec.set_max_len(self.max_transmit_size); + Box::pin(future::ok(Framed::new( + socket, + GossipsubCodec { length_codec }, + ))) + } +} + +impl OutboundUpgrade for ProtocolConfig +where + TSocket: AsyncWrite + AsyncRead + Unpin + Send + 'static, +{ + type Output = Framed; + type Error = io::Error; + type Future = Pin> + Send>>; + + fn upgrade_outbound(self, socket: TSocket, _: Self::Info) -> Self::Future { + let mut length_codec = codec::UviBytes::default(); + length_codec.set_max_len(self.max_transmit_size); + Box::pin(future::ok(Framed::new( + socket, + GossipsubCodec { length_codec }, + ))) + } +} + +/* Gossip codec for the framing */ + +pub struct GossipsubCodec { + /// Codec to encode/decode the Unsigned varint length prefix of the frames. + length_codec: codec::UviBytes, +} + +impl Encoder for GossipsubCodec { + type Item = GossipsubRpc; + type Error = io::Error; + + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + // messages + let publish = item + .messages + .into_iter() + .map(|message| rpc_proto::Message { + from: Some(message.source.into_bytes()), + data: Some(message.data), + seqno: Some(message.sequence_number.to_be_bytes().to_vec()), + topic_ids: message + .topics + .into_iter() + .map(TopicHash::into_string) + .collect(), + }) + .collect::>(); + + // subscriptions + let subscriptions = item + .subscriptions + .into_iter() + .map(|sub| rpc_proto::rpc::SubOpts { + subscribe: Some(sub.action == GossipsubSubscriptionAction::Subscribe), + topic_id: Some(sub.topic_hash.into_string()), + }) + .collect::>(); + + // control messages + let mut control = rpc_proto::ControlMessage { + ihave: Vec::new(), + iwant: Vec::new(), + graft: Vec::new(), + prune: Vec::new(), + }; + + let empty_control_msg = item.control_msgs.is_empty(); + + for action in item.control_msgs { + match action { + // collect all ihave messages + GossipsubControlAction::IHave { + topic_hash, + message_ids, + } => { + let rpc_ihave = rpc_proto::ControlIHave { + topic_id: Some(topic_hash.into_string()), + message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(), + }; + control.ihave.push(rpc_ihave); + } + GossipsubControlAction::IWant { message_ids } => { + let rpc_iwant = rpc_proto::ControlIWant { + message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(), + }; + control.iwant.push(rpc_iwant); + } + GossipsubControlAction::Graft { topic_hash } => { + let rpc_graft = rpc_proto::ControlGraft { + topic_id: Some(topic_hash.into_string()), + }; + control.graft.push(rpc_graft); + } + GossipsubControlAction::Prune { topic_hash } => { + let rpc_prune = rpc_proto::ControlPrune { + topic_id: Some(topic_hash.into_string()), + }; + control.prune.push(rpc_prune); + } + } + } + + let rpc = rpc_proto::Rpc { + subscriptions, + publish, + control: if empty_control_msg { + None + } else { + Some(control) + }, + }; + + let mut buf = Vec::with_capacity(rpc.encoded_len()); + + rpc.encode(&mut buf) + .expect("Buffer has sufficient capacity"); + + // length prefix the protobuf message, ensuring the max limit is not hit + self.length_codec.encode(Bytes::from(buf), dst) + } +} + +impl Decoder for GossipsubCodec { + type Item = GossipsubRpc; + type Error = io::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + let packet = match self.length_codec.decode(src)? { + Some(p) => p, + None => return Ok(None), + }; + + let rpc = rpc_proto::Rpc::decode(&packet[..])?; + + let mut messages = Vec::with_capacity(rpc.publish.len()); + for publish in rpc.publish.into_iter() { + // ensure the sequence number is a u64 + let seq_no = publish.seqno.ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidData, + "sequence number was not provided", + ) + })?; + if seq_no.len() != 8 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "sequence number has an incorrect size", + )); + } + messages.push(GossipsubMessage { + source: PeerId::from_bytes(publish.from.unwrap_or_default()) + .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Invalid Peer Id"))?, + data: publish.data.unwrap_or_default(), + sequence_number: BigEndian::read_u64(&seq_no), + topics: publish + .topic_ids + .into_iter() + .map(TopicHash::from_raw) + .collect(), + }); + } + + let mut control_msgs = Vec::new(); + + if let Some(rpc_control) = rpc.control { + // Collect the gossipsub control messages + let ihave_msgs: Vec = rpc_control + .ihave + .into_iter() + .map(|ihave| GossipsubControlAction::IHave { + topic_hash: TopicHash::from_raw(ihave.topic_id.unwrap_or_default()), + message_ids: ihave + .message_ids + .into_iter() + .map(|x| MessageId(x)) + .collect::>(), + }) + .collect(); + + let iwant_msgs: Vec = rpc_control + .iwant + .into_iter() + .map(|iwant| GossipsubControlAction::IWant { + message_ids: iwant + .message_ids + .into_iter() + .map(|x| MessageId(x)) + .collect::>(), + }) + .collect(); + + let graft_msgs: Vec = rpc_control + .graft + .into_iter() + .map(|graft| GossipsubControlAction::Graft { + topic_hash: TopicHash::from_raw(graft.topic_id.unwrap_or_default()), + }) + .collect(); + + let prune_msgs: Vec = rpc_control + .prune + .into_iter() + .map(|prune| GossipsubControlAction::Prune { + topic_hash: TopicHash::from_raw(prune.topic_id.unwrap_or_default()), + }) + .collect(); + + control_msgs.extend(ihave_msgs); + control_msgs.extend(iwant_msgs); + control_msgs.extend(graft_msgs); + control_msgs.extend(prune_msgs); + } + + Ok(Some(GossipsubRpc { + messages, + subscriptions: rpc + .subscriptions + .into_iter() + .map(|sub| GossipsubSubscription { + action: if Some(true) == sub.subscribe { + GossipsubSubscriptionAction::Subscribe + } else { + GossipsubSubscriptionAction::Unsubscribe + }, + topic_hash: TopicHash::from_raw(sub.topic_id.unwrap_or_default()), + }) + .collect(), + control_msgs, + })) + } +} + +/// A type for gossipsub message ids. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct MessageId(pub String); + +impl std::fmt::Display for MessageId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl Into for MessageId { + fn into(self) -> String { + self.0.into() + } +} + +/// A message received by the gossipsub system. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct GossipsubMessage { + /// Id of the peer that published this message. + pub source: PeerId, + + /// Content of the message. Its meaning is out of scope of this library. + pub data: Vec, + + /// A random sequence number. + pub sequence_number: u64, + + /// List of topics this message belongs to. + /// + /// Each message can belong to multiple topics at once. + pub topics: Vec, +} + +/// A subscription received by the gossipsub system. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct GossipsubSubscription { + /// Action to perform. + pub action: GossipsubSubscriptionAction, + /// The topic from which to subscribe or unsubscribe. + pub topic_hash: TopicHash, +} + +/// Action that a subscription wants to perform. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum GossipsubSubscriptionAction { + /// The remote wants to subscribe to the given topic. + Subscribe, + /// The remote wants to unsubscribe from the given topic. + Unsubscribe, +} + +/// A Control message received by the gossipsub system. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum GossipsubControlAction { + /// Node broadcasts known messages per topic - IHave control message. + IHave { + /// The topic of the messages. + topic_hash: TopicHash, + /// A list of known message ids (peer_id + sequence _number) as a string. + message_ids: Vec, + }, + /// The node requests specific message ids (peer_id + sequence _number) - IWant control message. + IWant { + /// A list of known message ids (peer_id + sequence _number) as a string. + message_ids: Vec, + }, + /// The node has been added to the mesh - Graft control message. + Graft { + /// The mesh topic the peer should be added to. + topic_hash: TopicHash, + }, + /// The node has been removed from the mesh - Prune control message. + Prune { + /// The mesh topic the peer should be removed from. + topic_hash: TopicHash, + }, +} diff --git a/protocols/gossipsub/src/rpc.proto b/protocols/gossipsub/src/rpc.proto new file mode 100644 index 00000000000..1aa19430aa2 --- /dev/null +++ b/protocols/gossipsub/src/rpc.proto @@ -0,0 +1,75 @@ +syntax = "proto2"; + +package gossipsub.pb; + +message RPC { + repeated SubOpts subscriptions = 1; + repeated Message publish = 2; + + message SubOpts { + optional bool subscribe = 1; // subscribe or unsubscribe + optional string topic_id = 2; + } + + optional ControlMessage control = 3; +} + +message Message { + optional bytes from = 1; + optional bytes data = 2; + optional bytes seqno = 3; + repeated string topic_ids = 4; +} + +message ControlMessage { + repeated ControlIHave ihave = 1; + repeated ControlIWant iwant = 2; + repeated ControlGraft graft = 3; + repeated ControlPrune prune = 4; +} + +message ControlIHave { + optional string topic_id = 1; + repeated string message_ids = 2; +} + +message ControlIWant { + repeated string message_ids= 1; +} + +message ControlGraft { + optional string topic_id = 1; +} + +message ControlPrune { + optional string topic_id = 1; +} + +// topicID = hash(topicDescriptor); (not the topic.name) +message TopicDescriptor { + optional string name = 1; + optional AuthOpts auth = 2; + optional EncOpts enc = 3; + + message AuthOpts { + optional AuthMode mode = 1; + repeated bytes keys = 2; // root keys to trust + + enum AuthMode { + NONE = 0; // no authentication, anyone can publish + KEY = 1; // only messages signed by keys in the topic descriptor are accepted + WOT = 2; // web of trust, certificates can allow publisher set to grow + } + } + + message EncOpts { + optional EncMode mode = 1; + repeated bytes key_hashes = 2; // the hashes of the shared keys used (salted) + + enum EncMode { + NONE = 0; // no encryption, anyone can read + SHAREDKEY = 1; // messages are encrypted with shared key + WOT = 2; // web of trust, certificates can allow publisher set to grow + } + } +} diff --git a/protocols/gossipsub/src/topic.rs b/protocols/gossipsub/src/topic.rs new file mode 100644 index 00000000000..6eacb9b3265 --- /dev/null +++ b/protocols/gossipsub/src/topic.rs @@ -0,0 +1,93 @@ +// Copyright 2020 Sigma Prime Pty Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::rpc_proto; +use base64::encode; +use prost::Message; +use sha2::{Digest, Sha256}; +use std::fmt; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct TopicHash { + /// The topic hash. Stored as a string to align with the protobuf API. + hash: String, +} + +impl TopicHash { + pub fn from_raw(hash: impl Into) -> TopicHash { + TopicHash { hash: hash.into() } + } + + pub fn into_string(self) -> String { + self.hash + } + + pub fn as_str(&self) -> &str { + &self.hash + } +} + +/// A gossipsub topic. +#[derive(Debug, Clone)] +pub struct Topic { + topic: String, +} + +impl Topic { + pub fn new(topic: String) -> Self { + Topic { topic } + } + + /// Creates a `TopicHash` by SHA256 hashing the topic then base64 encoding the + /// hash. + pub fn sha256_hash(&self) -> TopicHash { + let topic_descripter = rpc_proto::TopicDescriptor { + name: Some(self.topic.clone()), + auth: None, + enc: None, + }; + let mut bytes = Vec::with_capacity(topic_descripter.encoded_len()); + topic_descripter + .encode(&mut bytes) + .expect("buffer is large enough"); + let hash = encode(Sha256::digest(&bytes).as_slice()); + + TopicHash { hash } + } + + /// Creates a `TopicHash` as a raw string. + pub fn no_hash(&self) -> TopicHash { + TopicHash { + hash: self.topic.clone(), + } + } +} + +impl fmt::Display for Topic { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.topic) + } +} + +impl fmt::Display for TopicHash { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.hash) + } +} diff --git a/protocols/gossipsub/tests/smoke.rs b/protocols/gossipsub/tests/smoke.rs new file mode 100644 index 00000000000..fc1185dd87c --- /dev/null +++ b/protocols/gossipsub/tests/smoke.rs @@ -0,0 +1,224 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::prelude::*; +use log::debug; +use quickcheck::{QuickCheck, TestResult}; +use rand::{random, seq::SliceRandom, SeedableRng}; +use std::{ + io::Error, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; + +use libp2p_core::{ + identity, + multiaddr::Protocol, + muxing::StreamMuxerBox, + nodes::Substream, + transport::{boxed::Boxed, MemoryTransport}, + upgrade, Multiaddr, PeerId, Transport, +}; +use libp2p_gossipsub::{Gossipsub, GossipsubConfig, GossipsubEvent, Topic}; +use libp2p_plaintext::PlainText2Config; +use libp2p_swarm::Swarm; +use libp2p_yamux as yamux; + +type TestSwarm = + Swarm, Gossipsub>>; + +struct Graph { + pub nodes: Vec<(Multiaddr, TestSwarm)>, +} + +impl Future for Graph { + type Output = (Multiaddr, GossipsubEvent); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + for (addr, node) in &mut self.nodes { + match node.poll_next_unpin(cx) { + Poll::Ready(Some(event)) => return Poll::Ready((addr.clone(), event)), + Poll::Ready(None) => panic!("unexpected None when polling nodes"), + Poll::Pending => {} + } + } + + Poll::Pending + } +} + +impl Graph { + fn new_connected(num_nodes: usize, seed: u64) -> Graph { + if num_nodes == 0 { + panic!("expecting at least one node"); + } + + let mut rng = rand::rngs::StdRng::seed_from_u64(seed); + + let mut not_connected_nodes = std::iter::once(()) + .cycle() + .take(num_nodes) + .map(|_| build_node()) + .collect::>(); + + let mut connected_nodes = vec![not_connected_nodes.pop().unwrap()]; + + while !not_connected_nodes.is_empty() { + connected_nodes.shuffle(&mut rng); + not_connected_nodes.shuffle(&mut rng); + + let mut next = not_connected_nodes.pop().unwrap(); + let connected_addr = &connected_nodes[0].0; + + // Memory transport can not handle addresses with `/p2p` suffix. + let mut connected_addr_no_p2p = connected_addr.clone(); + let p2p_suffix_connected = connected_addr_no_p2p.pop(); + + debug!( + "Connect: {} -> {}", + next.0.clone().pop().unwrap(), + p2p_suffix_connected.unwrap() + ); + + Swarm::dial_addr(&mut next.1, connected_addr_no_p2p).unwrap(); + + connected_nodes.push(next); + } + + Graph { + nodes: connected_nodes, + } + } + + /// Polls the graph and passes each event into the provided FnMut until it returns `true`. + fn wait_for(self, mut f: F) -> Self + where + F: FnMut(GossipsubEvent) -> bool, + { + // The future below should return self. Given that it is a FnMut and not a FnOnce, one needs + // to wrap `self` in an Option, leaving a `None` behind after the final `Poll::Ready`. + let mut this = Some(self); + + let fut = futures::future::poll_fn(move |cx| match &mut this { + Some(graph) => loop { + match graph.poll_unpin(cx) { + Poll::Ready((_addr, ev)) => { + if f(ev) { + return Poll::Ready(this.take().unwrap()); + } + } + Poll::Pending => return Poll::Pending, + } + }, + None => panic!("future called after final return"), + }); + + let fut = async_std::future::timeout(Duration::from_secs(10), fut); + + futures::executor::block_on(fut).unwrap() + } +} + +fn build_node() -> (Multiaddr, TestSwarm) { + let key = identity::Keypair::generate_ed25519(); + let public_key = key.public(); + + let transport = MemoryTransport::default() + .upgrade(upgrade::Version::V1) + .authenticate(PlainText2Config { + local_public_key: public_key.clone(), + }) + .multiplex(yamux::Config::default()) + .map(|(p, m), _| (p, StreamMuxerBox::new(m))) + .map_err(|e| panic!("Failed to create transport: {:?}", e)) + .boxed(); + + let peer_id = public_key.clone().into_peer_id(); + let behaviour = Gossipsub::new(peer_id.clone(), GossipsubConfig::default()); + let mut swarm = Swarm::new(transport, behaviour, peer_id); + + let port = 1 + random::(); + let mut addr: Multiaddr = Protocol::Memory(port).into(); + Swarm::listen_on(&mut swarm, addr.clone()).unwrap(); + + addr = addr.with(libp2p_core::multiaddr::Protocol::P2p( + public_key.into_peer_id().into(), + )); + + (addr, swarm) +} + +#[test] +fn multi_hop_propagation() { + let _ = env_logger::try_init(); + + fn prop(num_nodes: usize, seed: u64) -> TestResult { + if num_nodes < 2 || num_nodes > 100 { + return TestResult::discard(); + } + + debug!("number nodes: {:?}, seed: {:?}", num_nodes, seed); + + let mut graph = Graph::new_connected(num_nodes, seed); + let number_nodes = graph.nodes.len(); + + // Subscribe each node to the same topic. + let topic = Topic::new("test-net".into()); + for (_addr, node) in &mut graph.nodes { + node.subscribe(topic.clone()); + } + + // Wait for all nodes to be subscribed. + let mut subscribed = 0; + graph = graph.wait_for(move |ev| { + if let GossipsubEvent::Subscribed { .. } = ev { + subscribed += 1; + if subscribed == (number_nodes - 1) * 2 { + return true; + } + } + + false + }); + + // Publish a single message. + graph.nodes[0].1.publish(&topic, vec![1, 2, 3]); + + // Wait for all nodes to receive the published message. + let mut received_msgs = 0; + graph.wait_for(move |ev| { + if let GossipsubEvent::Message(..) = ev { + received_msgs += 1; + if received_msgs == number_nodes - 1 { + return true; + } + } + + false + }); + + TestResult::passed() + } + + QuickCheck::new() + .max_tests(10) + .quickcheck(prop as fn(usize, u64) -> TestResult) +} diff --git a/src/lib.rs b/src/lib.rs index 6aee5fc7d28..2d762a80070 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -174,6 +174,8 @@ pub use libp2p_kad as kad; #[doc(inline)] pub use libp2p_floodsub as floodsub; #[doc(inline)] +pub use libp2p_gossipsub as gossipsub; +#[doc(inline)] pub use libp2p_mplex as mplex; #[cfg(not(any(target_os = "emscripten", target_os = "unknown")))] #[doc(inline)]