Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
rewrite network protocol/service to use channels
Browse files Browse the repository at this point in the history
  • Loading branch information
gterzian committed Jan 2, 2019
1 parent 50aa34f commit 3f82e52
Show file tree
Hide file tree
Showing 17 changed files with 1,086 additions and 659 deletions.
29 changes: 29 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 9 additions & 9 deletions core/finality-grandpa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,18 +230,18 @@ pub trait Network: Clone {
}

/// Bridge between NetworkService, gossiping consensus messages and Grandpa
pub struct NetworkBridge<B: BlockT, S: network::specialization::NetworkSpecialization<B>, H: ExHashT> {
service: Arc<NetworkService<B, S, H>>
pub struct NetworkBridge<B: BlockT> {
service: Arc<NetworkService<B>>
}

impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>, H: ExHashT> NetworkBridge<B, S, H> {
impl<B: BlockT> NetworkBridge<B> {
/// Create a new NetworkBridge to the given NetworkService
pub fn new(service: Arc<NetworkService<B, S, H>>) -> Self {
pub fn new(service: Arc<NetworkService<B>>) -> Self {
NetworkBridge { service }
}
}

impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>, H: ExHashT> Clone for NetworkBridge<B, S, H> {
impl<B: BlockT> Clone for NetworkBridge<B> {
fn clone(&self) -> Self {
NetworkBridge {
service: Arc::clone(&self.service)
Expand All @@ -257,10 +257,10 @@ fn commit_topic<B: BlockT>(set_id: u64) -> B::Hash {
<<B::Header as HeaderT>::Hashing as HashT>::hash(format!("{}-COMMITS", set_id).as_bytes())
}

impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>, H: ExHashT> Network for NetworkBridge<B, S, H> {
impl<B: BlockT> Network for NetworkBridge<B> {
type In = mpsc::UnboundedReceiver<ConsensusMessage>;
fn messages_for(&self, round: u64, set_id: u64) -> Self::In {
self.service.consensus_gossip().write().messages_for(message_topic::<B>(round, set_id))
self.service.consensus_gossip_messages_for(message_topic::<B>(round, set_id))
}

fn send_message(&self, round: u64, set_id: u64, message: Vec<u8>) {
Expand All @@ -270,11 +270,11 @@ impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>, H: ExHashT

fn drop_messages(&self, round: u64, set_id: u64) {
let topic = message_topic::<B>(round, set_id);
self.service.consensus_gossip().write().collect_garbage(|t| t == &topic);
self.service.consensus_gossip_collect_garbage_for(topic);
}

fn commit_messages(&self, set_id: u64) -> Self::In {
self.service.consensus_gossip().write().messages_for(commit_topic::<B>(set_id))
self.service.consensus_gossip_messages_for(commit_topic::<B>(set_id))
}

fn send_commit(&self, set_id: u64, message: Vec<u8>) {
Expand Down
18 changes: 6 additions & 12 deletions core/finality-grandpa/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,8 @@ impl Network for MessageRouting {
fn messages_for(&self, round: u64, set_id: u64) -> Self::In {
let inner = self.inner.lock();
let peer = inner.peer(self.peer_id);
let mut gossip = peer.consensus_gossip().write();
let messages = peer.with_spec(move |_, _| {
gossip.messages_for(make_topic(round, set_id))
});
//let mut gossip = peer.consensus_gossip().write();
let messages = peer.consensus_gossip_messages_for(make_topic(round, set_id));

let messages = messages.map_err(
move |_| panic!("Messages for round {} dropped too early", round)
Expand All @@ -200,19 +198,15 @@ impl Network for MessageRouting {
let topic = make_topic(round, set_id);
let inner = self.inner.lock();
let peer = inner.peer(self.peer_id);
let mut gossip = peer.consensus_gossip().write();
peer.with_spec(move |_, _| {
gossip.collect_garbage(|t| t == &topic)
});
//let mut gossip = peer.consensus_gossip().write();
peer.consensus_gossip_collect_garbage_for(topic);
}

fn commit_messages(&self, set_id: u64) -> Self::In {
let inner = self.inner.lock();
let peer = inner.peer(self.peer_id);
let mut gossip = peer.consensus_gossip().write();
let messages = peer.with_spec(move |_, _| {
gossip.messages_for(make_commit_topic(set_id))
});
//let mut gossip = peer.consensus_gossip().write();
let messages = peer.consensus_gossip_messages_for(make_commit_topic(set_id));

let messages = messages.map_err(
move |_| panic!("Commit messages for set {} dropped too early", set_id)
Expand Down
1 change: 1 addition & 0 deletions core/network-libp2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]

[dependencies]
crossbeam-channel = "0.3.4"
bytes = "0.4"
error-chain = { version = "0.12", default-features = false }
fnv = "1.0"
Expand Down
3 changes: 2 additions & 1 deletion core/network-libp2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#![recursion_limit = "128"]

extern crate crossbeam_channel;
extern crate parking_lot;
extern crate fnv;
extern crate futures;
Expand Down Expand Up @@ -55,7 +56,7 @@ pub use custom_proto::RegisteredProtocol;
pub use error::{Error, ErrorKind, DisconnectReason};
pub use libp2p::{Multiaddr, multiaddr::{Protocol}, multiaddr, PeerId};
pub use secret::obtain_private_key;
pub use service_task::{start_service, Service, ServiceEvent};
pub use service_task::{start_service, NetworkMsg, NetworkPort, Service, ServiceEvent};
pub use traits::{NetworkConfiguration, NodeIndex, NodeId, NonReservedPeerMode};
pub use traits::{ProtocolId, Secret, Severity};

Expand Down
Loading

0 comments on commit 3f82e52

Please sign in to comment.