Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Revert "Update networking code to libp2p 0.14 (#4383)"
Browse files Browse the repository at this point in the history
This reverts commit 1ec08e8.
  • Loading branch information
andresilva committed Jan 17, 2020
1 parent 8de5340 commit 7da697b
Show file tree
Hide file tree
Showing 29 changed files with 803 additions and 839 deletions.
592 changes: 302 additions & 290 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ macro_rules! new_full {
($config:expr, $with_startup_data: expr) => {{
use futures::{
prelude::*,
compat::Future01CompatExt
compat::{Future01CompatExt, Stream01CompatExt},
};
use sc_network::Event;

Expand Down Expand Up @@ -180,8 +180,9 @@ macro_rules! new_full {
service.spawn_essential_task(babe);

let network = service.network();
let dht_event_stream = network.event_stream().filter_map(|e| async move { match e {
Event::Dht(e) => Some(e),
let network_event_stream = network.event_stream().compat();
let dht_event_stream = network_event_stream.filter_map(|e| async move { match e {
Ok(Event::Dht(e)) => Some(e),
_ => None,
}}).boxed();
let authority_discovery = sc_authority_discovery::AuthorityDiscovery::new(
Expand Down
2 changes: 1 addition & 1 deletion client/authority-discovery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ codec = { package = "parity-scale-codec", default-features = false, version = "1
derive_more = "0.99.2"
futures = "0.3.1"
futures-timer = "2.0"
libp2p = { version = "0.14.0-alpha.1", default-features = false, features = ["secp256k1", "libp2p-websocket"] }
libp2p = { version = "0.13.2", default-features = false, features = ["secp256k1", "libp2p-websocket"] }
log = "0.4.8"
prost = "0.5.0"
rand = "0.7.2"
Expand Down
2 changes: 1 addition & 1 deletion client/network-gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ futures01 = { package = "futures", version = "0.1.29" }
futures = { version = "0.3.1", features = ["compat"] }
futures-timer = "0.4.0"
lru = "0.1.2"
libp2p = { version = "0.14.0-alpha.1", default-features = false, features = ["libp2p-websocket"] }
libp2p = { version = "0.13.2", default-features = false, features = ["libp2p-websocket"] }
sc-network = { version = "0.8", path = "../network" }
parking_lot = "0.9.0"
sp-runtime = { version = "2.0.0", path = "../../primitives/runtime" }
3 changes: 1 addition & 2 deletions client/network-gossip/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ pub use self::bridge::GossipEngine;
pub use self::state_machine::TopicNotification;
pub use self::validator::{DiscardAll, MessageIntent, Validator, ValidatorContext, ValidationResult};

use futures::prelude::*;
use sc_network::{specialization::NetworkSpecialization, Event, ExHashT, NetworkService, PeerId, ReputationChange};
use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
use std::sync::Arc;
Expand Down Expand Up @@ -98,7 +97,7 @@ pub trait Network<B: BlockT> {

impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Network<B> for Arc<NetworkService<B, S, H>> {
fn event_stream(&self) -> Box<dyn futures01::Stream<Item = Event, Error = ()> + Send> {
Box::new(NetworkService::event_stream(self).map(|v| Ok::<_, ()>(v)).compat())
Box::new(NetworkService::event_stream(self))
}

fn report_peer(&self, peer_id: PeerId, reputation: ReputationChange) {
Expand Down
15 changes: 9 additions & 6 deletions client/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,22 @@ authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"

[dependencies]
bytes = "0.5.0"
bytes = "0.4.12"
derive_more = "0.99.2"
either = "1.5.3"
log = "0.4.8"
parking_lot = "0.9.0"
bitflags = "1.2.0"
fnv = "1.0.6"
futures = "0.3.1"
futures_codec = "0.3.3"
futures = "0.1.29"
futures03 = { package = "futures", version = "0.3.1", features = ["compat"] }
futures-timer = "0.4.0"
linked-hash-map = "0.5.2"
linked_hash_set = "0.1.3"
lru = "0.4.0"
rustc-hex = "2.0.1"
rand = "0.7.2"
libp2p = { version = "0.14.0-alpha.1", default-features = false, features = ["libp2p-websocket"] }
libp2p = { version = "0.13.2", default-features = false, features = ["libp2p-websocket"] }
fork-tree = { version = "2.0.0", path = "../../utils/fork-tree" }
sp-consensus = { version = "0.8", path = "../../primitives/consensus/common" }
sc-client = { version = "0.8", path = "../" }
Expand All @@ -39,7 +39,9 @@ serde_json = "1.0.41"
slog = { version = "2.5.2", features = ["nested-values"] }
slog_derive = "0.2.0"
smallvec = "0.6.10"
unsigned-varint = { version = "0.3.0", features = ["codec"] }
tokio-io = "0.1.12"
tokio = { version = "0.1.22", optional = true }
unsigned-varint = { version = "0.2.2", features = ["codec"] }
sp-keyring = { version = "2.0.0", optional = true, path = "../../primitives/keyring" }
substrate-test-client = { version = "2.0.0", optional = true, path = "../../test-utils/client" }
substrate-test-runtime-client = { version = "2.0.0", optional = true, path = "../../test-utils/runtime/client" }
Expand All @@ -55,7 +57,8 @@ sp-keyring = { version = "2.0.0", path = "../../primitives/keyring" }
quickcheck = "0.9.0"
rand = "0.7.2"
tempfile = "3.1.0"
tokio = "0.1.22"

[features]
default = []
test-helpers = ["sp-keyring", "substrate-test-runtime-client"]
test-helpers = ["sp-keyring", "substrate-test-runtime-client", "tokio"]
13 changes: 7 additions & 6 deletions client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::{
};
use crate::{ExHashT, specialization::NetworkSpecialization};
use crate::protocol::{CustomMessageOutcome, Protocol};
use futures::prelude::*;
use libp2p::NetworkBehaviour;
use libp2p::core::{Multiaddr, PeerId, PublicKey};
use libp2p::kad::record;
Expand All @@ -28,7 +29,7 @@ use libp2p::core::{nodes::Substream, muxing::StreamMuxerBox};
use log::{debug, warn};
use sp_consensus::{BlockOrigin, import_queue::{IncomingBlock, Origin}};
use sp_runtime::{traits::{Block as BlockT, NumberFor}, Justification};
use std::{iter, task::Context, task::Poll};
use std::iter;
use void;

/// General behaviour of the network. Combines all protocols together.
Expand Down Expand Up @@ -58,7 +59,7 @@ pub enum BehaviourOut<B: BlockT> {

impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Behaviour<B, S, H> {
/// Builds a new `Behaviour`.
pub async fn new(
pub fn new(
substrate: Protocol<B, S, H>,
user_agent: String,
local_public_key: PublicKey,
Expand All @@ -74,7 +75,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Behaviour<B, S, H> {
known_addresses,
enable_mdns,
allow_private_ipv4
).await,
),
events: Vec::new(),
}
}
Expand Down Expand Up @@ -211,11 +212,11 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> NetworkBehaviourEventPr
}

impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Behaviour<B, S, H> {
fn poll<TEv>(&mut self, _: &mut Context) -> Poll<NetworkBehaviourAction<TEv, BehaviourOut<B>>> {
fn poll<TEv>(&mut self) -> Async<NetworkBehaviourAction<TEv, BehaviourOut<B>>> {
if !self.events.is_empty() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0)))
return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0)))
}

Poll::Pending
Async::NotReady
}
}
63 changes: 31 additions & 32 deletions client/network/src/debug_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

use fnv::FnvHashMap;
use futures::prelude::*;
use futures03::{StreamExt as _, TryStreamExt as _};
use libp2p::Multiaddr;
use libp2p::core::{ConnectedPoint, either::EitherOutput, PeerId, PublicKey};
use libp2p::swarm::{IntoProtocolsHandler, IntoProtocolsHandlerSelect, ProtocolsHandler};
Expand All @@ -24,9 +25,8 @@ use libp2p::identify::{Identify, IdentifyEvent, IdentifyInfo};
use libp2p::ping::{Ping, PingConfig, PingEvent, PingSuccess};
use log::{debug, trace, error};
use std::collections::hash_map::Entry;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use tokio_io::{AsyncRead, AsyncWrite};
use crate::utils::interval;

/// Time after we disconnect from a node before we purge its information from the cache.
Expand All @@ -44,7 +44,7 @@ pub struct DebugInfoBehaviour<TSubstream> {
/// Information that we know about all nodes.
nodes_info: FnvHashMap<PeerId, NodeInfo>,
/// Interval at which we perform garbage collection in `nodes_info`.
garbage_collect: Pin<Box<dyn Stream<Item = ()> + Send>>,
garbage_collect: Box<dyn Stream<Item = (), Error = ()> + Send>,
}

/// Information about a node we're connected to.
Expand Down Expand Up @@ -76,7 +76,7 @@ impl<TSubstream> DebugInfoBehaviour<TSubstream> {
ping: Ping::new(PingConfig::new()),
identify,
nodes_info: FnvHashMap::default(),
garbage_collect: Box::pin(interval(GARBAGE_COLLECT_INTERVAL)),
garbage_collect: Box::new(interval(GARBAGE_COLLECT_INTERVAL).map(|()| Ok(())).compat()),
}
}

Expand Down Expand Up @@ -149,7 +149,7 @@ pub enum DebugInfoEvent {
}

impl<TSubstream> NetworkBehaviour for DebugInfoBehaviour<TSubstream>
where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static {
where TSubstream: AsyncRead + AsyncWrite {
type ProtocolsHandler = IntoProtocolsHandlerSelect<
<Ping<TSubstream> as NetworkBehaviour>::ProtocolsHandler,
<Identify<TSubstream> as NetworkBehaviour>::ProtocolsHandler
Expand Down Expand Up @@ -253,71 +253,70 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static {

fn poll(
&mut self,
cx: &mut Context,
params: &mut impl PollParameters
) -> Poll<
) -> Async<
NetworkBehaviourAction<
<<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent,
Self::OutEvent
>
> {
loop {
match self.ping.poll(cx, params) {
Poll::Pending => break,
Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)) => {
match self.ping.poll(params) {
Async::NotReady => break,
Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)) => {
if let PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) } = ev {
self.handle_ping_report(&peer, rtt)
}
},
Poll::Ready(NetworkBehaviourAction::DialAddress { address }) =>
return Poll::Ready(NetworkBehaviourAction::DialAddress { address }),
Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id }) =>
return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id }),
Poll::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) =>
return Poll::Ready(NetworkBehaviourAction::SendEvent {
Async::Ready(NetworkBehaviourAction::DialAddress { address }) =>
return Async::Ready(NetworkBehaviourAction::DialAddress { address }),
Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }) =>
return Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }),
Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) =>
return Async::Ready(NetworkBehaviourAction::SendEvent {
peer_id,
event: EitherOutput::First(event)
}),
Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) =>
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }),
Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) =>
return Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }),
}
}

loop {
match self.identify.poll(cx, params) {
Poll::Pending => break,
Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => {
match self.identify.poll(params) {
Async::NotReady => break,
Async::Ready(NetworkBehaviourAction::GenerateEvent(event)) => {
match event {
IdentifyEvent::Received { peer_id, info, .. } => {
self.handle_identify_report(&peer_id, &info);
let event = DebugInfoEvent::Identified { peer_id, info };
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
return Async::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
IdentifyEvent::Error { peer_id, error } =>
debug!(target: "sub-libp2p", "Identification with peer {:?} failed => {}", peer_id, error),
IdentifyEvent::Sent { .. } => {}
}
},
Poll::Ready(NetworkBehaviourAction::DialAddress { address }) =>
return Poll::Ready(NetworkBehaviourAction::DialAddress { address }),
Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id }) =>
return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id }),
Poll::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) =>
return Poll::Ready(NetworkBehaviourAction::SendEvent {
Async::Ready(NetworkBehaviourAction::DialAddress { address }) =>
return Async::Ready(NetworkBehaviourAction::DialAddress { address }),
Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }) =>
return Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }),
Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) =>
return Async::Ready(NetworkBehaviourAction::SendEvent {
peer_id,
event: EitherOutput::Second(event)
}),
Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) =>
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }),
Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) =>
return Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }),
}
}

while let Poll::Ready(Some(())) = self.garbage_collect.poll_next_unpin(cx) {
while let Ok(Async::Ready(Some(_))) = self.garbage_collect.poll() {
self.nodes_info.retain(|_, node| {
node.info_expire.as_ref().map(|exp| *exp >= Instant::now()).unwrap_or(true)
});
}

Poll::Pending
Async::NotReady
}
}
Loading

0 comments on commit 7da697b

Please sign in to comment.