Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Kademlia discovery #501

Merged
merged 9 commits into from
Jun 16, 2020
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ listening_multiaddr = "<multiaddress>"
bootstrap_peers = ["<multiaddress>"]
```

Example of a [multiaddress](https://github.com/multiformats/multiaddr): `"/ip4/54.186.82.90/tcp/1347"`
Example of a [multiaddress](https://github.com/multiformats/multiaddr): `"/ip4/54.186.82.90/tcp/1347/p2p/12D3K1oWKNF7vNFEhnvB45E9mw2B5z6t419W3ziZPLdUDVnLLKGs"`

### Logging

Expand Down
31 changes: 16 additions & 15 deletions blockchain/chain_sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -848,17 +848,7 @@ where
// update sync state to Bootstrap indicating we are acquiring a 'secure enough' set of peers
self.set_state(SyncState::Bootstrap);

// TODO change from using random peerID to managed
while self.peer_manager.is_empty().await {
warn!("No valid peers to sync, waiting for other nodes");
task::sleep(Duration::from_secs(5)).await;
}

let peer_id = self
.peer_manager
.get_peer()
.await
.expect("Peer set is not empty here");
let peer_id = self.get_peer().await;

// checkpoint established
self.set_state(SyncState::Checkpoint);
Expand Down Expand Up @@ -934,15 +924,14 @@ where
}
/// fork detected, collect tipsets to be included in return_set sync_headers_reverse
async fn sync_fork(&mut self, head: &Tipset, to: &Tipset) -> Result<Vec<Tipset>, Error> {
// TODO change from using random peerID to managed
let peer_id = PeerId::random();
// pulled from Lotus: https://github.com/filecoin-project/lotus/blob/master/chain/sync.go#L996
let peer_id = self.get_peer().await;
// TODO move to shared parameter (from actors crate most likely)
const FORK_LENGTH_THRESHOLD: u64 = 500;

// Load blocks from network using blocksync
let tips: Vec<Tipset> = self
.network
.blocksync_headers(peer_id.clone(), head.parents(), FORK_LENGTH_THRESHOLD)
.blocksync_headers(peer_id, head.parents(), FORK_LENGTH_THRESHOLD)
.await
.map_err(|_| Error::Other("Could not retrieve tipset".to_string()))?;

Expand Down Expand Up @@ -982,6 +971,18 @@ where
pub fn set_state(&mut self, new_state: SyncState) {
self.state = new_state
}

async fn get_peer(&self) -> PeerId {
while self.peer_manager.is_empty().await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this can't find a peer, it will loop indefinitely, is it worth it to have a time out mechanism?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe implement a try_get_peer() that takes in a timeout variable as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somewhat intentional, because it can't do anything else until it has a peer to sync with. The logic hasn't changed, I just put in a function because there was another place it should be used.

The logic is definitely a bit broken now after the logic had been refactored since this was setup, but I don't want to cause conflicts with the work Erlich is doing in a bit. This shouldn't be an issue once the syncing setup is improved, but maybe something to keep in mind @ec2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But yeah, definitely agree a timeout is worthwhile to have, just probably after the setup is refactored (unless you guys think otherwise)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is totally fine to me!

warn!("No valid peers to sync, waiting for other nodes");
task::sleep(Duration::from_secs(5)).await;
}

self.peer_manager
.get_peer()
.await
.expect("Peer set is not empty here")
}
}

/// Returns message root CID from bls and secp message contained in the param Block
Expand Down
1 change: 1 addition & 0 deletions forest/src/logger/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub(crate) fn setup_logger() {
logger_builder.parse_filters(&s);
} else {
// If no ENV variable specified, default to info
logger_builder.filter(Some("libp2p_gossipsub"), LevelFilter::Warn);
logger_builder.filter(None, LevelFilter::Info);
}
let logger = logger_builder.build();
Expand Down
5 changes: 2 additions & 3 deletions forest/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn main() {

// Capture CLI inputs
let cli = cli::CLI::from_args();
let mut config = cli.get_config().expect("CLI error");
let config = cli.get_config().expect("CLI error");

let net_keypair = match get_keypair(&format!("{}{}", &config.data_dir, "/libp2p/keypair")) {
Some(kp) => kp,
Expand Down Expand Up @@ -56,8 +56,7 @@ fn main() {
initialize_genesis(&config.genesis_file, &mut chain_store).unwrap();

// Libp2p service setup
config.network.set_network_name(&network_name);
let p2p_service = Libp2pService::new(&config.network, net_keypair);
let p2p_service = Libp2pService::new(config.network, net_keypair, &network_name);
let network_rx = p2p_service.network_receiver();
let network_send = p2p_service.network_sender();

Expand Down
108 changes: 88 additions & 20 deletions node/forest_libp2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,45 @@
// SPDX-License-Identifier: Apache-2.0, MIT

use super::rpc::{RPCEvent, RPCMessage, RPC};
use crate::config::Libp2pConfig;
use libp2p::core::identity::Keypair;
use libp2p::core::PeerId;
use libp2p::gossipsub::{Gossipsub, GossipsubConfig, GossipsubEvent, Topic, TopicHash};
use libp2p::identify::{Identify, IdentifyEvent};
use libp2p::kad::record::store::MemoryStore;
use libp2p::kad::{Kademlia, KademliaConfig, KademliaEvent, QueryId};
use libp2p::mdns::{Mdns, MdnsEvent};
use libp2p::multiaddr::Protocol;
use libp2p::ping::{
handler::{PingFailure, PingSuccess},
Ping, PingEvent,
};
use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters};
use libp2p::NetworkBehaviour;
use log::{debug, warn};
use log::{debug, trace, warn};
use std::collections::HashSet;
use std::{task::Context, task::Poll};

#[derive(NetworkBehaviour)]
#[behaviour(out_event = "ForestBehaviourEvent", poll_method = "poll")]
pub struct ForestBehaviour {
pub gossipsub: Gossipsub,
pub mdns: Mdns,
pub ping: Ping,
pub identify: Identify,
pub rpc: RPC,
gossipsub: Gossipsub,
// TODO configure to allow turning mdns off
mdns: Mdns,
ping: Ping,
identify: Identify,
rpc: RPC,
kademlia: Kademlia<MemoryStore>,
#[behaviour(ignore)]
events: Vec<ForestBehaviourEvent>,
#[behaviour(ignore)]
peers: HashSet<PeerId>,
}

#[derive(Debug)]
pub enum ForestBehaviourEvent {
PeerDialed(PeerId),
PeerDisconnected(PeerId),
DiscoveredPeer(PeerId),
ExpiredPeer(PeerId),
GossipMessage {
source: PeerId,
topics: Vec<TopicHash>,
Expand All @@ -47,20 +54,34 @@ impl NetworkBehaviourEventProcess<MdnsEvent> for ForestBehaviour {
match event {
MdnsEvent::Discovered(list) => {
for (peer, _) in list {
self.events.push(ForestBehaviourEvent::DiscoveredPeer(peer))
trace!("mdns: Discovered peer {}", peer.to_base58());
self.add_peer(peer);
}
}
MdnsEvent::Expired(list) => {
for (peer, _) in list {
if !self.mdns.has_node(&peer) {
self.events.push(ForestBehaviourEvent::ExpiredPeer(peer))
self.remove_peer(&peer);
}
}
}
}
}
}

impl NetworkBehaviourEventProcess<KademliaEvent> for ForestBehaviour {
fn inject_event(&mut self, event: KademliaEvent) {
match event {
KademliaEvent::Discovered { peer_id, .. } => {
self.add_peer(peer_id);
}
event => {
trace!("kad: {:?}", event);
}
}
}
}

impl NetworkBehaviourEventProcess<GossipsubEvent> for ForestBehaviour {
fn inject_event(&mut self, message: GossipsubEvent) {
if let GossipsubEvent::Message(_, _, message) = message {
Expand All @@ -77,14 +98,14 @@ impl NetworkBehaviourEventProcess<PingEvent> for ForestBehaviour {
fn inject_event(&mut self, event: PingEvent) {
match event.result {
Result::Ok(PingSuccess::Ping { rtt }) => {
debug!(
trace!(
"PingSuccess::Ping rtt to {} is {} ms",
event.peer.to_base58(),
rtt.as_millis()
);
}
Result::Ok(PingSuccess::Pong) => {
debug!("PingSuccess::Pong from {}", event.peer.to_base58());
trace!("PingSuccess::Pong from {}", event.peer.to_base58());
}
Result::Err(PingFailure::Timeout) => {
debug!("PingFailure::Timeout {}", event.peer.to_base58());
Expand All @@ -104,12 +125,12 @@ impl NetworkBehaviourEventProcess<IdentifyEvent> for ForestBehaviour {
info,
observed_addr,
} => {
debug!("Identified Peer {:?}", peer_id);
debug!("protocol_version {:}?", info.protocol_version);
debug!("agent_version {:?}", info.agent_version);
debug!("listening_ addresses {:?}", info.listen_addrs);
debug!("observed_address {:?}", observed_addr);
debug!("protocols {:?}", info.protocols);
trace!("Identified Peer {}", peer_id);
trace!("protocol_version {}", info.protocol_version);
trace!("agent_version {}", info.agent_version);
trace!("listening_ addresses {:?}", info.listen_addrs);
trace!("observed_address {}", observed_addr);
trace!("protocols {:?}", info.protocols);
}
IdentifyEvent::Sent { .. } => (),
IdentifyEvent::Error { .. } => (),
Expand Down Expand Up @@ -158,19 +179,51 @@ impl ForestBehaviour {
Poll::Pending
}

pub fn new(local_key: &Keypair) -> Self {
pub fn new(local_key: &Keypair, config: &Libp2pConfig, network_name: &str) -> Self {
let local_peer_id = local_key.public().into_peer_id();
let gossipsub_config = GossipsubConfig::default();

// Kademlia config
let store = MemoryStore::new(local_peer_id.to_owned());
let mut kad_config = KademliaConfig::default();
let network = format!("/fil/kad/{}/kad/1.0.0", network_name);
kad_config.set_protocol_name(network.as_bytes().to_vec());
let mut kademlia = Kademlia::with_config(local_peer_id.to_owned(), store, kad_config);
for multiaddr in config.bootstrap_peers.iter() {
let mut addr = multiaddr.to_owned();
if let Some(Protocol::P2p(mh)) = addr.pop() {
let peer_id = PeerId::from_multihash(mh).unwrap();
kademlia.add_address(&peer_id, addr);
} else {
warn!("Could not add addr {} to Kademlia DHT", multiaddr)
}
}
if let Err(e) = kademlia.bootstrap() {
warn!("Kademlia bootstrap failed: {}", e);
}

ForestBehaviour {
gossipsub: Gossipsub::new(local_peer_id, gossipsub_config),
mdns: Mdns::new().expect("Could not start mDNS"),
ping: Ping::default(),
identify: Identify::new("forest/libp2p".into(), "0.0.1".into(), local_key.public()),
identify: Identify::new(
"ipfs/0.1.0".into(),
// TODO update to include actual version
format!("forest-{}", "0.1.0"),
local_key.public(),
),
kademlia,
rpc: RPC::default(),
events: vec![],
peers: Default::default(),
}
}

/// Bootstrap Kademlia network
pub fn bootstrap(&mut self) -> Result<QueryId, String> {
self.kademlia.bootstrap().map_err(|e| e.to_string())
}

/// Publish data over the gossip network.
pub fn publish(&mut self, topic: &Topic, data: impl Into<Vec<u8>>) {
self.gossipsub.publish(topic, data);
Expand All @@ -185,4 +238,19 @@ impl ForestBehaviour {
pub fn send_rpc(&mut self, peer_id: PeerId, req: RPCEvent) {
self.rpc.send_rpc(peer_id, req);
}

/// Adds peer to the peer set.
pub fn add_peer(&mut self, peer_id: PeerId) {
self.peers.insert(peer_id);
}

/// Adds peer to the peer set.
pub fn remove_peer(&mut self, peer_id: &PeerId) {
self.peers.remove(peer_id);
}

/// Adds peer to the peer set.
pub fn peers(&self) -> &HashSet<PeerId> {
&self.peers
}
}
47 changes: 25 additions & 22 deletions node/forest_libp2p/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,40 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use libp2p::gossipsub::Topic;
use libp2p::Multiaddr;
use serde::Deserialize;

const DEFAULT_BOOTSTRAP: &[&str] = &[
"/dns4/bootstrap-0-sin.fil-test.net/tcp/1347/p2p/12D3KooWKNF7vNFEhnvB45E9mw2B5z6t419W3ziZPLdUDVnLLKGs",
"/ip4/86.109.15.57/tcp/1347/p2p/12D3KooWKNF7vNFEhnvB45E9mw2B5z6t419W3ziZPLdUDVnLLKGs",
"/dns4/bootstrap-0-dfw.fil-test.net/tcp/1347/p2p/12D3KooWECJTm7RUPyGfNbRwm6y2fK4wA7EB8rDJtWsq5AKi7iDr",
"/ip4/139.178.84.45/tcp/1347/p2p/12D3KooWECJTm7RUPyGfNbRwm6y2fK4wA7EB8rDJtWsq5AKi7iDr",
"/dns4/bootstrap-0-fra.fil-test.net/tcp/1347/p2p/12D3KooWC7MD6m7iNCuDsYtNr7xVtazihyVUizBbhmhEiyMAm9ym",
"/ip4/136.144.49.17/tcp/1347/p2p/12D3KooWC7MD6m7iNCuDsYtNr7xVtazihyVUizBbhmhEiyMAm9ym",
"/dns4/bootstrap-1-sin.fil-test.net/tcp/1347/p2p/12D3KooWD8eYqsKcEMFax6EbWN3rjA7qFsxCez2rmN8dWqkzgNaN",
"/ip4/86.109.15.55/tcp/1347/p2p/12D3KooWD8eYqsKcEMFax6EbWN3rjA7qFsxCez2rmN8dWqkzgNaN",
"/dns4/bootstrap-1-dfw.fil-test.net/tcp/1347/p2p/12D3KooWLB3RR8frLAmaK4ntHC2dwrAjyGzQgyUzWxAum1FxyyqD",
"/ip4/139.178.84.41/tcp/1347/p2p/12D3KooWLB3RR8frLAmaK4ntHC2dwrAjyGzQgyUzWxAum1FxyyqD",
"/dns4/bootstrap-1-fra.fil-test.net/tcp/1347/p2p/12D3KooWGPDJAw3HW4uVU3JEQBfFaZ1kdpg4HvvwRMVpUYbzhsLQ",
"/ip4/136.144.49.131/tcp/1347/p2p/12D3KooWGPDJAw3HW4uVU3JEQBfFaZ1kdpg4HvvwRMVpUYbzhsLQ",
];

#[derive(Debug, Deserialize)]
#[serde(default)]
pub struct Libp2pConfig {
pub listening_multiaddr: String,
pub bootstrap_peers: Vec<String>,
#[serde(skip_deserializing)] // Always use default
pub pubsub_topics: Vec<Topic>,
}

impl Libp2pConfig {
/// Sets the pubsub topics to the network name provided
pub fn set_network_name(&mut self, s: &str) {
self.pubsub_topics = vec![
Topic::new(format!("/fil/blocks/{}", s)),
Topic::new(format!("/fil/msgs/{}", s)),
]
}
pub listening_multiaddr: Multiaddr,
pub bootstrap_peers: Vec<Multiaddr>,
}

impl Default for Libp2pConfig {
fn default() -> Self {
Libp2pConfig {
listening_multiaddr: "/ip4/0.0.0.0/tcp/0".to_owned(),
pubsub_topics: vec![
Topic::new("/fil/blocks/interop".to_owned()),
Topic::new("/fil/msgs/interop".to_owned()),
],
bootstrap_peers: vec![],
let bootstrap_peers = DEFAULT_BOOTSTRAP
.iter()
.map(|node| node.parse().unwrap())
.collect();
Self {
listening_multiaddr: "/ip4/0.0.0.0/tcp/0".parse().unwrap(),
bootstrap_peers,
}
}
}
Loading