Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gossipsub Protocol #898

Merged
merged 113 commits into from
Jan 24, 2020
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
113 commits
Select commit Hold shift + click to select a range
e9b7059
Create gossipsub crate - Basic template, borrowed from floodsub
AgeManning Jan 15, 2019
e09cb3f
Add a GossipsubConfig struct and set up basic structures in the Gossi…
AgeManning Jan 16, 2019
65f3211
Begin implementation of join. Adds get_random_peers helper function a…
AgeManning Jan 16, 2019
bcef5ff
Implements gossipsub leave()
AgeManning Jan 17, 2019
fb0160f
Update publishMany to incorporate gossipsub mesh and fanout logic
AgeManning Jan 17, 2019
e0933dd
Use the gossipsub mesh for determining peer subscription
AgeManning Jan 17, 2019
6de264e
Remove subscribed_topics field from the Gossipsub struct
AgeManning Jan 17, 2019
e3c3d0f
Rename gossipsubconfig to ProtocolConfig
AgeManning Jan 17, 2019
47dbb69
Implement the gossipsub control messages into the Codec's Encode/Deco…
AgeManning Jan 17, 2019
e05f363
Modify GossipsubActions to enums for succinctness.
AgeManning Jan 21, 2019
53c364b
Modify the memcache to store Gossipsub messages
AgeManning Jan 21, 2019
cb5e20b
Implement control message handling.
AgeManning Jan 21, 2019
e865ed5
Update control message handling to handle multiple messages.
AgeManning Jan 21, 2019
80673d1
Handle received gossipsub messages using pre-built handlers.
AgeManning Jan 21, 2019
1aa63ad
Remove excess connected peer hashmap
AgeManning Jan 21, 2019
db345bb
Add extra peer mapping and consistent topic naming.
AgeManning Jan 22, 2019
5531342
Implement heartbeat, emit_gossip and send_graft_prune.
AgeManning Jan 23, 2019
a29ae37
Group logic in forwarding messages. Add messages to memcache.
AgeManning Jan 23, 2019
a85ee9a
Add heartbeat timer and move location of helper function.
AgeManning Jan 23, 2019
e0142b6
Add gossipsub the libp2p workspace, makes layer structs public
AgeManning Jan 23, 2019
8cdaf6b
Add logging to gossipsub
AgeManning Jan 24, 2019
deccbe5
Add example chat for debugging purposes
AgeManning Jan 24, 2019
e69a772
Implement #868 for gossipsub.
AgeManning Jan 24, 2019
3dad3ea
Add rust documentation to gossipsub crate.
AgeManning Jan 28, 2019
77047c3
Re-introduce the initial heartbeat time config.
AgeManning Jan 28, 2019
c7b3ca5
Add subscribe tests.
AgeManning Jan 28, 2019
2b74f28
Add Bug fixes and further testing for gossipsub.
AgeManning Jan 29, 2019
0dd9cd2
Rename GossipsubMessage::msg_id -> id
AgeManning Jan 29, 2019
2442d50
Add bug fix for handling disconnected peers.
AgeManning Jan 29, 2019
abfa4da
Implements (partially) #889 for Gossipsub.
AgeManning Jan 29, 2019
f04d242
handle_iwant event count tests
g-r-a-n-t Jan 31, 2019
3c8ebe5
handle_ihave event count tests
g-r-a-n-t Feb 1, 2019
12e0703
Move layer.rs tests into separate file.
AgeManning Feb 1, 2019
6a126e6
Implement clippy suggestions for gossipsub.
AgeManning Feb 1, 2019
f8277fa
Modify control message tests for specific types.
AgeManning Feb 1, 2019
9165964
Implement builder pattern for GossipsubConfig.
AgeManning Feb 2, 2019
a46ecc7
Package version updates as suggested by @twittner.
AgeManning Feb 2, 2019
ec522f2
Correct line lengths in gossipsub.
AgeManning Feb 2, 2019
e5d85c9
Correct braces in found by @twittner.
AgeManning Feb 2, 2019
4a08459
Implement @twittner's suggestions.
AgeManning Feb 2, 2019
0a51e3f
Add NodeList struct to clarify topic_peers.
AgeManning Feb 2, 2019
2e855f4
Cleaner handling of messagelist
twittner Feb 2, 2019
8d32626
Cleaner handling of added peers.
twittner Feb 2, 2019
e78f417
handle_prune peer removed test
g-r-a-n-t Feb 4, 2019
30f98dc
basic grafting tests
g-r-a-n-t Feb 6, 2019
27414e3
multiple topic grafting test
g-r-a-n-t Feb 7, 2019
4ed865f
Convert &vec to slice.
twittner Feb 21, 2019
67ec8cc
Convert to lazy insert.
twittner Feb 21, 2019
1566043
Cleaner topic handling.
twittner Feb 21, 2019
dc5eddf
Merge branch 'gossipsub' of github.com:sigp/rust-libp2p into piggy-ba…
g-r-a-n-t Feb 22, 2019
866e507
control pool piggybacking
g-r-a-n-t Feb 22, 2019
faa7b03
Add Debug derives to gossipsub and correct tests.
AgeManning Feb 26, 2019
0bf1bcd
Merge master to gossipsub.
AgeManning Feb 26, 2019
e2a6c38
Merge branch 'gossipsub' of github.com:sigp/rust-libp2p into piggy-ba…
g-r-a-n-t Mar 2, 2019
d5459e7
changes from PR
g-r-a-n-t Mar 2, 2019
7ba4787
Merge pull request #3 from g-r-a-n-t/piggy-backing
AgeManning Mar 3, 2019
706c1bc
Merge latest master
AgeManning Mar 10, 2019
ed523b8
Implements Arc for GossipsubRpc events
AgeManning Mar 11, 2019
97e433e
Remove support for floodsub nodes
AgeManning Mar 11, 2019
2d9b125
Reconnected to disconnected peers, to mitigate timeout
AgeManning Mar 26, 2019
4d9cd66
Merge latest master
AgeManning Mar 26, 2019
987fec3
Use ReadOne WriteOne with configurable max gossip sizes
AgeManning Mar 26, 2019
b3c32d9
Remove length delimination from RPC encoding
AgeManning Mar 26, 2019
666d1f1
Prevent peer duplication in mesh
AgeManning Apr 2, 2019
fd6b1ab
Allow oneshot handler's inactivity_timeout to be configurable
AgeManning Apr 2, 2019
fb852bc
Implement inactivity timeout and update to latest master
AgeManning Apr 2, 2019
25ce0fa
Correct peer duplication in mesh bug
AgeManning Apr 30, 2019
9ccb186
Remove auto-reconnect to allow for user-level disconnects
AgeManning Apr 30, 2019
a6445a5
Update to latest master
AgeManning Apr 30, 2019
81e552d
Merge latest master
AgeManning Jun 18, 2019
5c612b6
Merge branch 'master' into gossipsub
AgeManning Jun 25, 2019
5284d01
Update to latest master
AgeManning Jul 26, 2019
9c56b84
Single long-lived inbound/outbound streams to match go implementation
AgeManning Jul 28, 2019
6169fba
Allow gossipsub topics to be optionally hashable
AgeManning Jul 29, 2019
7a9de4c
Improves gossipsub stream handling
AgeManning Aug 24, 2019
7680160
Update to latest master
AgeManning Aug 24, 2019
01632ef
Allows message validation in gossipsub
AgeManning Sep 3, 2019
697e46a
Replaces Cuckoofilter with LRUCache
AgeManning Sep 10, 2019
684956d
Merge latest master, updates protobuf
AgeManning Sep 29, 2019
264cf33
Renames configuration parameter and corrects logic
AgeManning Oct 24, 2019
bf7ad5c
Removes peer from fanout on disconnection
AgeManning Oct 29, 2019
eb1c5fc
Updates to the latest master
AgeManning Oct 29, 2019
6c5988f
Add publish and fanout tests
pawanjay176 Nov 6, 2019
2744b12
Merge pull request #17 from pawanjay176/gossipsub-tests
AgeManning Nov 10, 2019
3843c04
Apply @mxinden suggestions
AgeManning Nov 23, 2019
b805666
Merge latest master
AgeManning Nov 29, 2019
76b6e1f
Resend message if outbound stream negotiated
AgeManning Dec 9, 2019
8716930
Implement further reviewer suggestions
AgeManning Dec 9, 2019
82cf4dc
Add MessageId type and remove unnecessary comments
AgeManning Dec 13, 2019
9d69802
Add a return value to propagate_message function
AgeManning Dec 13, 2019
12226bc
Adds user-customised gossipsub message ids
AgeManning Dec 16, 2019
3af999a
Adds the message id to GossipsubEvent
AgeManning Dec 16, 2019
0bdf093
Implement Debug for GossipsubConfig
AgeManning Dec 16, 2019
6ac92bc
protocols/gossipsub: Add basic smoke test
mxinden Dec 23, 2019
a84d243
Merge latest master
AgeManning Jan 3, 2020
1774a76
Corrections pointed out by @mxinden
AgeManning Jan 3, 2020
776d13e
Add option to remove source id publishing
AgeManning Jan 3, 2020
ff9565b
protocols/gossipsub/tests/smoke: Remove unused variable
mxinden Jan 6, 2020
592cc9d
Merge pull request #21 from mxinden/gossipsub-smoke
AgeManning Jan 8, 2020
a255768
Merge latest master
AgeManning Jan 15, 2020
4d969ef
protocols/gossipsub: Move to stable futures
mxinden Jan 2, 2020
6375130
examples/gossipsub-chat.rs: Move to stable futures
mxinden Jan 2, 2020
1395205
protocols/gossipsub/src/behaviour/tests: Update to stable futures
mxinden Jan 2, 2020
542b100
protocols/gossipsub/tests: Update to stable futures
mxinden Jan 15, 2020
0c054df
protocols/gossipsub: Log substream errors
mxinden Jan 20, 2020
93b5bb2
protocols/gossipsub: Log outbound substream errors
mxinden Jan 20, 2020
a9e19a5
Merge pull request #22 from mxinden/gossipsub-stable-futures
AgeManning Jan 23, 2020
2b7bdbd
Merge in latest master
AgeManning Jan 23, 2020
684968c
Remove rust-fmt formatting
AgeManning Jan 23, 2020
1ca966b
Shift to prost for protobuf compiling
AgeManning Jan 24, 2020
a565efc
Merge branch 'master' into gossipsub
AgeManning Jan 24, 2020
25a9636
Use wasm_timer for wasm compatibility
AgeManning Jan 24, 2020
1033831
Merge branch 'master' into gossipsub
tomaka Jan 24, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ libp2p-mplex = { version = "0.3.0", path = "./muxers/mplex" }
libp2p-identify = { version = "0.3.0", path = "./protocols/identify" }
libp2p-kad = { version = "0.3.0", path = "./protocols/kad" }
libp2p-floodsub = { version = "0.3.0", path = "./protocols/floodsub" }
libp2p-gossipsub = { version = "0.1.0", path = "./protocols/gossipsub" }
libp2p-ping = { version = "0.3.0", path = "./protocols/ping" }
libp2p-plaintext = { version = "0.3.0", path = "./protocols/plaintext" }
libp2p-ratelimit = { version = "0.3.0", path = "./transports/ratelimit" }
Expand Down Expand Up @@ -64,6 +65,7 @@ members = [
"muxers/mplex",
"muxers/yamux",
"protocols/floodsub",
"protocols/gossipsub",
"protocols/identify",
"protocols/kad",
"protocols/noise",
Expand Down
30 changes: 30 additions & 0 deletions protocols/gossipsub/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[package]
name = "libp2p-gossipsub"
version = "0.1.0"
authors = ["Age Manning <age@agemanning.com>"]
license = "MIT"

[dependencies]
libp2p-core = { path = "../../core" }
libp2p-floodsub = { path = "../floodsub" }
bs58 = "0.2.2"
bytes = "0.4.11"
byteorder = "1.3.1"
cuckoofilter = "0.3.2"
fnv = "1.0.6"
futures = "0.1.25"
protobuf = "2.3.0"
rand = "0.6.5"
smallvec = "0.6.8"
tokio-codec = "0.1.1"
tokio-io = "0.1.11"
tokio-timer = "0.2.8"
unsigned-varint = "0.2.1"
log = "0.4.6"


[dev-dependencies]
libp2p = { path = "../../" }
tokio = "0.1"
tokio-stdin-stdout = "0.1"
env_logger = "0.6.0"
88 changes: 88 additions & 0 deletions protocols/gossipsub/examples/chat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
extern crate env_logger;
extern crate futures;
extern crate libp2p;
extern crate log;
extern crate tokio;

use env_logger::{Builder, Env};
use futures::prelude::*;
use libp2p::gossipsub::GossipsubEvent;
use libp2p::{
gossipsub, secio,
tokio_codec::{FramedRead, LinesCodec},
};
use std::time::Duration;

fn main() {
Builder::from_env(Env::default().default_filter_or("debug")).init();

// Create a random PeerId
let local_key = secio::SecioKeyPair::ed25519_generated().unwrap();
let local_peer_id = local_key.to_peer_id();
println!("Local peer id: {:?}", local_peer_id);

// Set up an encrypted TCP Transport over the Mplex and Yamux protocols
let transport = libp2p::build_development_transport(local_key);

// Create a Floodsub/Gossipsub topic
let topic = libp2p::floodsub::TopicBuilder::new("test").build();

// Create a Swarm to manage peers and events
let mut swarm = {
// set default parameters for gossipsub
// let gossipsub_config = gossipsub::GossipsubConfig::default();
// set custom gossipsub
let gossipsub_config = gossipsub::GossipsubConfigBuilder::new()
.heartbeat_interval(Duration::from_secs(10))
.build();
// build a gossipsub network behaviour
let mut gossipsub = gossipsub::Gossipsub::new(local_peer_id.clone(), gossipsub_config);
gossipsub.subscribe(topic.clone());
libp2p::Swarm::new(transport, gossipsub, local_peer_id)
};

// Listen on all interfaces and whatever port the OS assigns
let addr = libp2p::Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap();
println!("Listening on {:?}", addr);

// Reach out to another node if specified
if let Some(to_dial) = std::env::args().nth(1) {
let dialing = to_dial.clone();
match to_dial.parse() {
Ok(to_dial) => match libp2p::Swarm::dial_addr(&mut swarm, to_dial) {
Ok(_) => println!("Dialed {:?}", dialing),
Err(e) => println!("Dial {:?} failed: {:?}", dialing, e),
},
Err(err) => println!("Failed to parse address to dial: {:?}", err),
}
}

// Read full lines from stdin
let stdin = tokio_stdin_stdout::stdin(0);
let mut framed_stdin = FramedRead::new(stdin, LinesCodec::new());

// Kick it off
tokio::run(futures::future::poll_fn(move || -> Result<_, ()> {
loop {
match framed_stdin.poll().expect("Error while polling stdin") {
Async::Ready(Some(line)) => swarm.publish(&topic, line.as_bytes()),
Async::Ready(None) => panic!("Stdin closed"),
Async::NotReady => break,
};
}

loop {
match swarm.poll().expect("Error while polling swarm") {
Async::Ready(Some(gossip_event)) => match gossip_event {
GossipsubEvent::Message(message) => {
println!("Got message: {:?}", String::from_utf8_lossy(&message.data))
}
_ => {}
},
Async::Ready(None) | Async::NotReady => break,
}
}

Ok(Async::NotReady)
}));
}
13 changes: 13 additions & 0 deletions protocols/gossipsub/regen_structs_proto.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/sh

# This script regenerates the `src/rpc_proto.rs` file from `rpc.proto`.

docker run --rm -v `pwd`:/usr/code:z -w /usr/code rust /bin/bash -c " \
AgeManning marked this conversation as resolved.
Show resolved Hide resolved
apt-get update; \
apt-get install -y protobuf-compiler; \
cargo install --version 2.3.0 protobuf-codegen; \
protoc --rust_out . rpc.proto"

sudo chown $USER:$USER *.rs

mv -f rpc.rs ./src/rpc_proto.rs
75 changes: 75 additions & 0 deletions protocols/gossipsub/rpc.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
syntax = "proto2";

package gossipsub.pb;

message RPC {
repeated SubOpts subscriptions = 1;
repeated Message publish = 2;

message SubOpts {
optional bool subscribe = 1; // subscribe or unsubscribe
optional string topicid = 2;
}

optional ControlMessage control = 3;
}

message Message {
optional bytes from = 1;
optional bytes data = 2;
optional bytes seqno = 3;
repeated string topicIDs = 4;
}

message ControlMessage {
repeated ControlIHave ihave = 1;
repeated ControlIWant iwant = 2;
repeated ControlGraft graft = 3;
repeated ControlPrune prune = 4;
}

message ControlIHave {
optional string topicID = 1;
repeated string messageIDs = 2;
}

message ControlIWant {
repeated string messageIDs = 1;
}

message ControlGraft {
optional string topicID = 1;
}

message ControlPrune {
optional string topicID = 1;
}

// topicID = hash(topicDescriptor); (not the topic.name)
message TopicDescriptor {
optional string name = 1;
optional AuthOpts auth = 2;
optional EncOpts enc = 3;

message AuthOpts {
optional AuthMode mode = 1;
repeated bytes keys = 2; // root keys to trust

enum AuthMode {
NONE = 0; // no authentication, anyone can publish
KEY = 1; // only messages signed by keys in the topic descriptor are accepted
WOT = 2; // web of trust, certificates can allow publisher set to grow
}
}

message EncOpts {
optional EncMode mode = 1;
repeated bytes keyHashes = 2; // the hashes of the shared keys used (salted)
AgeManning marked this conversation as resolved.
Show resolved Hide resolved

enum EncMode {
NONE = 0; // no encryption, anyone can read
SHAREDKEY = 1; // messages are encrypted with shared key
WOT = 2; // web of trust, certificates can allow publisher set to grow
}
}
}
167 changes: 167 additions & 0 deletions protocols/gossipsub/src/gossipsub_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
use std::time::Duration;

/// Configuration parameters that define the performance of the gossipsub network.
#[derive(Clone)]
pub struct GossipsubConfig {
/// Overlay network parameters.
/// Number of heartbeats to keep in the `memcache`.
pub history_length: usize,
/// Number of past heartbeats to gossip about.
pub history_gossip: usize,

/// Target number of peers for the mesh network (D in the spec).
pub mesh_n: usize,
/// Minimum number of peers in mesh network before adding more (D_lo in the spec).
pub mesh_n_low: usize,
/// Maximum number of peers in mesh network before removing some (D_high in the spec).
pub mesh_n_high: usize,

/// Number of peers to emit gossip to during a heartbeat (D_lazy in the spec).
pub gossip_lazy: usize,

/// Initial delay in each heartbeat.
pub heartbeat_initial_delay: Duration,
/// Time between each heartbeat.
pub heartbeat_interval: Duration,
/// Time to live for fanout peers.
pub fanout_ttl: Duration,
}

impl Default for GossipsubConfig {
fn default() -> GossipsubConfig {
GossipsubConfig {
history_length: 5,
history_gossip: 3,
mesh_n: 6,
mesh_n_low: 4,
mesh_n_high: 12,
gossip_lazy: 6, // default to mesh_n
heartbeat_initial_delay: Duration::from_secs(5),
heartbeat_interval: Duration::from_secs(1),
fanout_ttl: Duration::from_secs(60),
}
}
}

pub struct GossipsubConfigBuilder {
history_length: usize,
/// Number of past heartbeats to gossip about.
history_gossip: usize,

/// Target number of peers for the mesh network (D in the spec).
mesh_n: usize,
/// Minimum number of peers in mesh network before adding more (D_lo in the spec).
mesh_n_low: usize,
/// Maximum number of peers in mesh network before removing some (D_high in the spec).
mesh_n_high: usize,

/// Number of peers to emit gossip to during a heartbeat (D_lazy in the spec).
gossip_lazy: usize,

/// Initial delay in each heartbeat.
heartbeat_initial_delay: Duration,
/// Time between each heartbeat.
heartbeat_interval: Duration,
/// Time to live for fanout peers.
fanout_ttl: Duration,
}

impl Default for GossipsubConfigBuilder {
fn default() -> GossipsubConfigBuilder {
GossipsubConfigBuilder {
history_length: 5,
history_gossip: 3,
mesh_n: 6,
mesh_n_low: 4,
mesh_n_high: 12,
gossip_lazy: 6, // default to mesh_n
heartbeat_initial_delay: Duration::from_secs(5),
heartbeat_interval: Duration::from_secs(1),
fanout_ttl: Duration::from_secs(60),
}
}
}

impl GossipsubConfigBuilder {
// set default values
pub fn new() -> GossipsubConfigBuilder {
GossipsubConfigBuilder::default()
}

pub fn history_length(&mut self, history_length: usize) -> &mut Self {
assert!(
history_length >= self.history_gossip,
"The history_length must be greater than or equal to the history_gossip length"
);
self.history_length = history_length;
self
}

pub fn history_gossip(&mut self, history_gossip: usize) -> &mut Self {
assert!(
self.history_length >= history_gossip,
"The history_length must be greater than or equal to the history_gossip length"
);
self.history_gossip = history_gossip;
self
}

pub fn mesh_n(&mut self, mesh_n: usize) -> &mut Self {
assert!(
self.mesh_n_low <= mesh_n && mesh_n <= self.mesh_n_high,
"The following equality doesn't hold mesh_n_low <= mesh_n <= mesh_n_high"
);
self.mesh_n = mesh_n;
self
}

pub fn mesh_n_low(&mut self, mesh_n_low: usize) -> &mut Self {
assert!(
mesh_n_low <= self.mesh_n && self.mesh_n <= self.mesh_n_high,
"The following equality doesn't hold mesh_n_low <= mesh_n <= mesh_n_high"
);
self.mesh_n_low = mesh_n_low;
self
}

pub fn mesh_n_high(&mut self, mesh_n_high: usize) -> &mut Self {
assert!(
self.mesh_n_low <= self.mesh_n && self.mesh_n <= mesh_n_high,
"The following equality doesn't hold mesh_n_low <= mesh_n <= mesh_n_high"
);
self.mesh_n_high = mesh_n_high;
self
}

pub fn gossip_lazy(&mut self, gossip_lazy: usize) -> &mut Self {
self.gossip_lazy = gossip_lazy;
self
}

pub fn heartbeat_initial_delay(&mut self, heartbeat_initial_delay: Duration) -> &mut Self {
self.heartbeat_initial_delay = heartbeat_initial_delay;
self
}
pub fn heartbeat_interval(&mut self, heartbeat_interval: Duration) -> &mut Self {
self.heartbeat_interval = heartbeat_interval;
self
}
pub fn fanout_ttl(&mut self, fanout_ttl: Duration) -> &mut Self {
self.fanout_ttl = fanout_ttl;
self
}

pub fn build(&self) -> GossipsubConfig {
GossipsubConfig {
history_length: self.history_length,
history_gossip: self.history_gossip,
mesh_n: self.mesh_n,
mesh_n_low: self.mesh_n_low,
mesh_n_high: self.mesh_n_high,
gossip_lazy: self.gossip_lazy,
heartbeat_initial_delay: self.heartbeat_initial_delay,
heartbeat_interval: self.heartbeat_interval,
fanout_ttl: self.fanout_ttl,
}
}
}
Loading