Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Upgrade to libp2p-0.28. (#7077)
Browse files Browse the repository at this point in the history
* Upgrade to libp2p-0.28

* Clean up test imports.

* CI

* CI

* CI?

* CI once more.

* One more.

* CI

* CI

* CI
  • Loading branch information
romanb authored Sep 14, 2020
1 parent ea07951 commit 5f955c5
Show file tree
Hide file tree
Showing 24 changed files with 181 additions and 268 deletions.
229 changes: 84 additions & 145 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ members = [
#
# This list is ordered alphabetically.
[profile.dev.package]
aes-ctr = { opt-level = 3 }
aes-soft = { opt-level = 3 }
aesni = { opt-level = 3 }
blake2 = { opt-level = 3 }
Expand All @@ -217,7 +216,6 @@ crc32fast = { opt-level = 3 }
crossbeam-deque = { opt-level = 3 }
crossbeam-queue = { opt-level = 3 }
crypto-mac = { opt-level = 3 }
ctr = { opt-level = 3 }
curve25519-dalek = { opt-level = 3 }
ed25519-dalek = { opt-level = 3 }
evm-core = { opt-level = 3 }
Expand Down
2 changes: 1 addition & 1 deletion bin/node/browser-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license = "Apache-2.0"

[dependencies]
futures-timer = "3.0.2"
libp2p = { version = "0.24.0", default-features = false }
libp2p = { version = "0.28.1", default-features = false }
jsonrpc-core = "14.2.0"
serde = "1.0.106"
serde_json = "1.0.48"
Expand Down
2 changes: 1 addition & 1 deletion client/authority-discovery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ derive_more = "0.99.2"
either = "1.5.3"
futures = "0.3.4"
futures-timer = "3.0.1"
libp2p = { version = "0.24.0", default-features = false, features = ["kad"] }
libp2p = { version = "0.28.1", default-features = false, features = ["kad"] }
log = "0.4.8"
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../utils/prometheus", version = "0.8.0-rc6"}
prost = "0.6.1"
Expand Down
2 changes: 1 addition & 1 deletion client/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ lazy_static = "1.4.0"
tokio = { version = "0.2.21", features = [ "signal", "rt-core", "rt-threaded", "blocking" ] }
futures = "0.3.4"
fdlimit = "0.2.0"
libp2p = "0.24.0"
libp2p = "0.28.1"
parity-scale-codec = "1.3.0"
hex = "0.4.2"
rand = "0.7.3"
Expand Down
2 changes: 1 addition & 1 deletion client/network-gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
futures = "0.3.4"
futures-timer = "3.0.1"
libp2p = { version = "0.24.0", default-features = false }
libp2p = { version = "0.28.1", default-features = false }
log = "0.4.8"
lru = "0.4.3"
sc-network = { version = "0.8.0-rc6", path = "../network" }
Expand Down
4 changes: 2 additions & 2 deletions client/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ wasm-timer = "0.2"
zeroize = "1.0.0"

[dependencies.libp2p]
version = "0.24.0"
version = "0.28.1"
default-features = false
features = ["identify", "kad", "mdns-async-std", "mplex", "noise", "ping", "request-response", "tcp-async-std", "websocket", "yamux"]

[dev-dependencies]
assert_matches = "1.3"
env_logger = "0.7.0"
libp2p = { version = "0.24.0", default-features = false, features = ["secio"] }
libp2p = { version = "0.28.1", default-features = false }
quickcheck = "0.9.0"
rand = "0.7.2"
sp-keyring = { version = "2.0.0-rc6", path = "../../primitives/keyring" }
Expand Down
2 changes: 1 addition & 1 deletion client/network/src/block_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ where
let mut cfg = OneShotHandlerConfig::default();
cfg.keep_alive_timeout = self.config.inactivity_timeout;
cfg.outbound_substream_timeout = self.config.request_timeout;
OneShotHandler::new(SubstreamProtocol::new(p), cfg)
OneShotHandler::new(SubstreamProtocol::new(p, ()), cfg)
}

fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
Expand Down
27 changes: 9 additions & 18 deletions client/network/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -765,8 +765,9 @@ mod tests {
use libp2p::{Multiaddr, PeerId};
use libp2p::core::upgrade;
use libp2p::core::transport::{Transport, MemoryTransport};
use libp2p::core::upgrade::{InboundUpgradeExt, OutboundUpgradeExt};
use libp2p::noise;
use libp2p::swarm::Swarm;
use libp2p::yamux;
use std::{collections::HashSet, task::Poll};
use super::{DiscoveryConfig, DiscoveryOut, protocol_name_from_protocol_id};

Expand All @@ -779,25 +780,15 @@ mod tests {
// the first swarm via `with_user_defined`.
let mut swarms = (0..25).map(|i| {
let keypair = Keypair::generate_ed25519();
let keypair2 = keypair.clone();

let noise_keys = noise::Keypair::<noise::X25519Spec>::new()
.into_authentic(&keypair)
.unwrap();

let transport = MemoryTransport
.and_then(move |out, endpoint| {
let secio = libp2p::secio::SecioConfig::new(keypair2);
libp2p::core::upgrade::apply(
out,
secio,
endpoint,
upgrade::Version::V1
)
})
.and_then(move |(peer_id, stream), endpoint| {
let peer_id2 = peer_id.clone();
let upgrade = libp2p::yamux::Config::default()
.map_inbound(move |muxer| (peer_id, muxer))
.map_outbound(move |muxer| (peer_id2, muxer));
upgrade::apply(stream, upgrade, endpoint, upgrade::Version::V1)
});
.upgrade(upgrade::Version::V1)
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
.multiplex(yamux::Config::default());

let behaviour = {
let mut config = DiscoveryConfig::new(keypair.public());
Expand Down
2 changes: 1 addition & 1 deletion client/network/src/finality_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ where
};
let mut cfg = OneShotHandlerConfig::default();
cfg.keep_alive_timeout = self.config.inactivity_timeout;
OneShotHandler::new(SubstreamProtocol::new(p), cfg)
OneShotHandler::new(SubstreamProtocol::new(p, ()), cfg)
}

fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
Expand Down
2 changes: 1 addition & 1 deletion client/network/src/light_client_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ where
};
let mut cfg = OneShotHandlerConfig::default();
cfg.keep_alive_timeout = self.config.inactivity_timeout;
OneShotHandler::new(SubstreamProtocol::new(p), cfg)
OneShotHandler::new(SubstreamProtocol::new(p, ()), cfg)
}

fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
Expand Down
26 changes: 15 additions & 11 deletions client/network/src/protocol/generic_proto/handler/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,25 +394,27 @@ impl ProtocolsHandler for NotifsHandler {
type OutboundProtocol = EitherUpgrade<NotificationsOut, RegisteredProtocol>;
// Index within the `out_handlers`; None for legacy
type OutboundOpenInfo = Option<usize>;
type InboundOpenInfo = ();

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
let in_handlers = self.in_handlers.iter()
.map(|(h, _)| h.listen_protocol().into_upgrade().1)
.collect::<UpgradeCollec<_>>();

let proto = SelectUpgrade::new(in_handlers, self.legacy.listen_protocol().into_upgrade().1);
SubstreamProtocol::new(proto)
SubstreamProtocol::new(proto, ())
}

fn inject_fully_negotiated_inbound(
&mut self,
out: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output
out: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
(): ()
) {
match out {
EitherOutput::First((out, num)) =>
self.in_handlers[num].0.inject_fully_negotiated_inbound(out),
self.in_handlers[num].0.inject_fully_negotiated_inbound(out, ()),
EitherOutput::Second(out) =>
self.legacy.inject_fully_negotiated_inbound(out),
self.legacy.inject_fully_negotiated_inbound(out, ()),
}
}

Expand Down Expand Up @@ -619,10 +621,11 @@ impl ProtocolsHandler for NotifsHandler {
if self.pending_legacy_handshake.is_none() {
while let Poll::Ready(ev) = self.legacy.poll(cx) {
match ev {
ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: () } =>
ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol } =>
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: protocol.map_upgrade(EitherUpgrade::B),
info: None,
protocol: protocol
.map_upgrade(EitherUpgrade::B)
.map_info(|()| None)
}),
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen {
received_handshake,
Expand Down Expand Up @@ -705,10 +708,11 @@ impl ProtocolsHandler for NotifsHandler {
for (handler_num, (handler, _)) in self.out_handlers.iter_mut().enumerate() {
while let Poll::Ready(ev) = handler.poll(cx) {
match ev {
ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: () } =>
ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol } =>
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: protocol.map_upgrade(EitherUpgrade::A),
info: Some(handler_num),
protocol: protocol
.map_upgrade(EitherUpgrade::A)
.map_info(|()| Some(handler_num))
}),
ProtocolsHandlerEvent::Close(err) => void::unreachable(err),

Expand Down
14 changes: 7 additions & 7 deletions client/network/src/protocol/generic_proto/handler/legacy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,7 @@ impl LegacyProtoHandler {
if incoming.is_empty() {
if let ConnectedPoint::Dialer { .. } = self.endpoint {
self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(self.protocol.clone()),
info: (),
protocol: SubstreamProtocol::new(self.protocol.clone(), ()),
});
}
ProtocolState::Opening {
Expand Down Expand Up @@ -428,8 +427,7 @@ impl LegacyProtoHandler {
deadline: Delay::new(Duration::from_secs(60))
};
Some(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(self.protocol.clone()),
info: (),
protocol: SubstreamProtocol::new(self.protocol.clone(), ()),
})
} else {
self.state = ProtocolState::Disabled { shutdown, reenable };
Expand Down Expand Up @@ -498,14 +496,16 @@ impl ProtocolsHandler for LegacyProtoHandler {
type InboundProtocol = RegisteredProtocol;
type OutboundProtocol = RegisteredProtocol;
type OutboundOpenInfo = ();
type InboundOpenInfo = ();

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
SubstreamProtocol::new(self.protocol.clone())
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
SubstreamProtocol::new(self.protocol.clone(), ())
}

fn inject_fully_negotiated_inbound(
&mut self,
(substream, handshake): <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output
(substream, handshake): <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
(): ()
) {
self.inject_fully_negotiated(substream, handshake);
}
Expand Down
8 changes: 5 additions & 3 deletions client/network/src/protocol/generic_proto/handler/notif_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,14 +175,16 @@ impl ProtocolsHandler for NotifsInHandler {
type InboundProtocol = NotificationsIn;
type OutboundProtocol = DeniedUpgrade;
type OutboundOpenInfo = ();
type InboundOpenInfo = ();

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
SubstreamProtocol::new(self.in_protocol.clone())
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
SubstreamProtocol::new(self.in_protocol.clone(), ())
}

fn inject_fully_negotiated_inbound(
&mut self,
(msg, proto): <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output
(msg, proto): <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
(): ()
) {
// If a substream already exists, we drop it and replace it with the new incoming one.
if self.substream.is_some() {
Expand Down
17 changes: 8 additions & 9 deletions client/network/src/protocol/generic_proto/handler/notif_out.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,14 +267,16 @@ impl ProtocolsHandler for NotifsOutHandler {
type InboundProtocol = DeniedUpgrade;
type OutboundProtocol = NotificationsOut;
type OutboundOpenInfo = ();
type InboundOpenInfo = ();

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
SubstreamProtocol::new(DeniedUpgrade)
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
SubstreamProtocol::new(DeniedUpgrade, ())
}

fn inject_fully_negotiated_inbound(
&mut self,
proto: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output
proto: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
(): ()
) {
// We should never reach here. `proto` is a `Void`.
void::unreachable(proto)
Expand Down Expand Up @@ -309,8 +311,7 @@ impl ProtocolsHandler for NotifsOutHandler {
State::Disabled => {
let proto = NotificationsOut::new(self.protocol_name.clone(), initial_message.clone());
self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(proto).with_timeout(OPEN_TIMEOUT),
info: (),
protocol: SubstreamProtocol::new(proto, ()).with_timeout(OPEN_TIMEOUT),
});
self.state = State::Opening { initial_message };
},
Expand All @@ -329,8 +330,7 @@ impl ProtocolsHandler for NotifsOutHandler {

let proto = NotificationsOut::new(self.protocol_name.clone(), initial_message.clone());
self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(proto).with_timeout(OPEN_TIMEOUT),
info: (),
protocol: SubstreamProtocol::new(proto, ()).with_timeout(OPEN_TIMEOUT),
});
self.state = State::Opening { initial_message };
},
Expand Down Expand Up @@ -414,8 +414,7 @@ impl ProtocolsHandler for NotifsOutHandler {
self.state = State::Opening { initial_message: initial_message.clone() };
let proto = NotificationsOut::new(self.protocol_name.clone(), initial_message);
self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(proto).with_timeout(OPEN_TIMEOUT),
info: (),
protocol: SubstreamProtocol::new(proto, ()).with_timeout(OPEN_TIMEOUT),
});
return Poll::Ready(ProtocolsHandlerEvent::Custom(NotifsOutHandlerOut::Closed));
}
Expand Down
40 changes: 19 additions & 21 deletions client/network/src/protocol/generic_proto/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,14 @@ use crate::protocol::generic_proto::{GenericProto, GenericProtoOut};

use futures::prelude::*;
use libp2p::{PeerId, Multiaddr, Transport};
use libp2p::core::{connection::{ConnectionId, ListenerId}, ConnectedPoint};
use libp2p::core::{
connection::{ConnectionId, ListenerId},
ConnectedPoint,
muxing,
transport::MemoryTransport,
upgrade
};
use libp2p::{identity, noise, yamux};
use libp2p::swarm::{
Swarm, ProtocolsHandler, IntoProtocolsHandler, PollParameters,
NetworkBehaviour, NetworkBehaviourAction
Expand All @@ -32,33 +39,24 @@ use std::{error, io, task::Context, task::Poll, time::Duration};
fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
let mut out = Vec::with_capacity(2);

let keypairs: Vec<_> = (0..2).map(|_| libp2p::identity::Keypair::generate_ed25519()).collect();
let keypairs: Vec<_> = (0..2).map(|_| identity::Keypair::generate_ed25519()).collect();
let addrs: Vec<Multiaddr> = (0..2)
.map(|_| format!("/memory/{}", rand::random::<u64>()).parse().unwrap())
.collect();

for index in 0 .. 2 {
let keypair = keypairs[index].clone();
let local_peer_id = keypair.public().into_peer_id();
let transport = libp2p::core::transport::MemoryTransport
.and_then(move |out, endpoint| {
let secio = libp2p::secio::SecioConfig::new(keypair);
libp2p::core::upgrade::apply(
out,
secio,
endpoint,
libp2p::core::upgrade::Version::V1
)
})
.and_then(move |(peer_id, stream), endpoint| {
libp2p::core::upgrade::apply(
stream,
libp2p::yamux::Config::default(),
endpoint,
libp2p::core::upgrade::Version::V1
)
.map_ok(|muxer| (peer_id, libp2p::core::muxing::StreamMuxerBox::new(muxer)))
})

let noise_keys = noise::Keypair::<noise::X25519Spec>::new()
.into_authentic(&keypair)
.unwrap();

let transport = MemoryTransport
.upgrade(upgrade::Version::V1)
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
.multiplex(yamux::Config::default())
.map(|(peer, muxer), _| (peer, muxing::StreamMuxerBox::new(muxer)))
.timeout(Duration::from_secs(20))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
.boxed();
Expand Down
Loading

0 comments on commit 5f955c5

Please sign in to comment.