diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8ed4ae9bed4..a5ac3e8734c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -9,7 +9,7 @@ on: jobs: test-desktop: name: Build and test - runs-on: ubuntu-latest + runs-on: ubuntu-18.04 strategy: matrix: args: [ @@ -34,7 +34,7 @@ jobs: test-wasm: name: Build on WASM - runs-on: ubuntu-latest + runs-on: ubuntu-18.04 strategy: matrix: toolchain: [ @@ -83,7 +83,7 @@ jobs: check-rustdoc-links: name: Check rustdoc intra-doc links - runs-on: ubuntu-latest + runs-on: ubuntu-18.04 container: image: rust steps: @@ -101,7 +101,7 @@ jobs: run: RUSTDOCFLAGS="--deny broken_intra_doc_links" cargo doc --verbose --workspace --no-deps --document-private-items check-clippy: - runs-on: ubuntu-latest + runs-on: ubuntu-18.04 steps: - name: Cancel Previous Runs @@ -127,7 +127,7 @@ jobs: integration-test: name: Integration tests - runs-on: ubuntu-latest + runs-on: ubuntu-18.04 container: image: rust steps: @@ -145,7 +145,7 @@ jobs: run: RUST_LOG=libp2p_swarm=debug,libp2p_kad=trace,libp2p_tcp=debug cargo run --example ipfs-kad rustfmt: - runs-on: ubuntu-latest + runs-on: ubuntu-18.04 steps: - name: Cancel Previous Runs diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index eacb3d734b2..67c5ba39f29 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -7,8 +7,11 @@ - Migrate to Rust edition 2021 (see [PR 2339]). +- Add support for ECDSA identities (see [PR 2352]). + [PR 2339]: https://github.com/libp2p/rust-libp2p/pull/2339 -[PR 2350]: https://github.com/libp2p/rust-libp2p/pull/2350/ +[PR 2350]: https://github.com/libp2p/rust-libp2p/pull/2350 +[PR 2352]: https://github.com/libp2p/rust-libp2p/pull/2352 # 0.30.1 [2021-11-16] diff --git a/core/Cargo.toml b/core/Cargo.toml index 3fbad0c70be..60c348fe258 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -25,12 +25,13 @@ log = "0.4" multiaddr = { version = "0.13.0" } multihash = { version = "0.14", default-features = false, features = ["std", "multihash-impl", "identity", "sha2"] } multistream-select = { version = "0.11", path = "../misc/multistream-select" } +p256 = { version = "0.10.0", default-features = false, features = ["ecdsa"], optional = true } parking_lot = "0.11.0" pin-project = "1.0.0" prost = "0.9" rand = "0.8" rw-stream-sink = "0.2.0" -sha2 = "0.9.1" +sha2 = "0.10.0" smallvec = "1.6.1" thiserror = "1.0" unsigned-varint = "0.7" @@ -55,8 +56,9 @@ rand07 = { package = "rand", version = "0.7" } prost-build = "0.9" [features] -default = ["secp256k1"] -secp256k1 = ["libsecp256k1"] +default = [ "secp256k1", "ecdsa" ] +secp256k1 = [ "libsecp256k1" ] +ecdsa = [ "p256" ] [[bench]] name = "peer_id" diff --git a/core/src/identity.rs b/core/src/identity.rs index 76ed4c39fd4..054138f88a4 100644 --- a/core/src/identity.rs +++ b/core/src/identity.rs @@ -32,6 +32,8 @@ //! (e.g. [ed25519 binary format](https://datatracker.ietf.org/doc/html/rfc8032#section-5.1.5)). //! All key types have functions to enable conversion to/from their binary representations. +#[cfg(feature = "ecdsa")] +pub mod ecdsa; pub mod ed25519; #[cfg(not(target_arch = "wasm32"))] pub mod rsa; @@ -71,6 +73,9 @@ pub enum Keypair { /// A Secp256k1 keypair. #[cfg(feature = "secp256k1")] Secp256k1(secp256k1::Keypair), + /// An ECDSA keypair. + #[cfg(feature = "ecdsa")] + Ecdsa(ecdsa::Keypair), } impl Keypair { @@ -85,6 +90,12 @@ impl Keypair { Keypair::Secp256k1(secp256k1::Keypair::generate()) } + /// Generate a new ECDSA keypair. + #[cfg(feature = "ecdsa")] + pub fn generate_ecdsa() -> Keypair { + Keypair::Ecdsa(ecdsa::Keypair::generate()) + } + /// Decode an keypair from a DER-encoded secret key in PKCS#8 PrivateKeyInfo /// format (i.e. unencrypted) as defined in [RFC5208]. /// @@ -114,6 +125,8 @@ impl Keypair { Rsa(ref pair) => pair.sign(msg), #[cfg(feature = "secp256k1")] Secp256k1(ref pair) => pair.secret().sign(msg), + #[cfg(feature = "ecdsa")] + Ecdsa(ref pair) => Ok(pair.secret().sign(msg)), } } @@ -126,6 +139,8 @@ impl Keypair { Rsa(pair) => PublicKey::Rsa(pair.public()), #[cfg(feature = "secp256k1")] Secp256k1(pair) => PublicKey::Secp256k1(pair.public().clone()), + #[cfg(feature = "ecdsa")] + Ecdsa(pair) => PublicKey::Ecdsa(pair.public().clone()), } } @@ -150,6 +165,12 @@ impl Keypair { "Encoding Secp256k1 key into Protobuf is unsupported", )) } + #[cfg(feature = "ecdsa")] + Self::Ecdsa(_) => { + return Err(DecodingError::new( + "Encoding ECDSA key into Protobuf is unsupported", + )) + } }; Ok(pk.encode_to_vec()) @@ -177,6 +198,9 @@ impl Keypair { keys_proto::KeyType::Secp256k1 => Err(DecodingError::new( "Decoding Secp256k1 key from Protobuf is unsupported.", )), + keys_proto::KeyType::Ecdsa => Err(DecodingError::new( + "Decoding ECDSA key from Protobuf is unsupported.", + )), } } } @@ -199,6 +223,9 @@ pub enum PublicKey { #[cfg(feature = "secp256k1")] /// A public Secp256k1 key. Secp256k1(secp256k1::PublicKey), + /// A public ECDSA key. + #[cfg(feature = "ecdsa")] + Ecdsa(ecdsa::PublicKey), } impl PublicKey { @@ -215,6 +242,8 @@ impl PublicKey { Rsa(pk) => pk.verify(msg, sig), #[cfg(feature = "secp256k1")] Secp256k1(pk) => pk.verify(msg, sig), + #[cfg(feature = "ecdsa")] + Ecdsa(pk) => pk.verify(msg, sig), } } @@ -266,6 +295,11 @@ impl From<&PublicKey> for keys_proto::PublicKey { r#type: keys_proto::KeyType::Secp256k1 as i32, data: key.encode().to_vec(), }, + #[cfg(feature = "ecdsa")] + PublicKey::Ecdsa(key) => keys_proto::PublicKey { + r#type: keys_proto::KeyType::Ecdsa as i32, + data: key.encode_der(), + }, } } } @@ -299,6 +333,15 @@ impl TryFrom for PublicKey { log::debug!("support for secp256k1 was disabled at compile-time"); Err(DecodingError::new("Unsupported")) } + #[cfg(feature = "ecdsa")] + keys_proto::KeyType::Ecdsa => { + ecdsa::PublicKey::decode_der(&pubkey.data).map(PublicKey::Ecdsa) + } + #[cfg(not(feature = "ecdsa"))] + keys_proto::KeyType::Ecdsa => { + log::debug!("support for ECDSA was disabled at compile-time"); + Err(DecodingError::new("Unsupported")) + } } } } diff --git a/core/src/identity/ecdsa.rs b/core/src/identity/ecdsa.rs new file mode 100644 index 00000000000..b883243b13b --- /dev/null +++ b/core/src/identity/ecdsa.rs @@ -0,0 +1,245 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! ECDSA keys with secp256r1 curve support. + +use super::error::DecodingError; +use core::fmt; +use p256::{ + ecdsa::{ + signature::{Signer, Verifier}, + Signature, SigningKey, VerifyingKey, + }, + EncodedPoint, +}; + +/// An ECDSA keypair. +#[derive(Clone)] +pub struct Keypair { + secret: SecretKey, + public: PublicKey, +} + +impl Keypair { + /// Generate a new random ECDSA keypair. + pub fn generate() -> Keypair { + Keypair::from(SecretKey::generate()) + } + + /// Sign a message using the private key of this keypair. + pub fn sign(&self, msg: &[u8]) -> Vec { + self.secret.sign(msg) + } + + /// Get the public key of this keypair. + pub fn public(&self) -> &PublicKey { + &self.public + } + + /// Get the secret key of this keypair. + pub fn secret(&self) -> &SecretKey { + &self.secret + } +} + +impl fmt::Debug for Keypair { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Keypair") + .field("public", &self.public()) + .finish() + } +} + +/// Promote an ECDSA secret key into a keypair. +impl From for Keypair { + fn from(secret: SecretKey) -> Keypair { + let public = PublicKey(VerifyingKey::from(&secret.0)); + Keypair { secret, public } + } +} + +/// Demote an ECDSA keypair to a secret key. +impl From for SecretKey { + fn from(kp: Keypair) -> SecretKey { + kp.secret + } +} + +/// An ECDSA secret key. +#[derive(Clone)] +pub struct SecretKey(SigningKey); + +impl SecretKey { + /// Generate a new random ECDSA secret key. + pub fn generate() -> SecretKey { + SecretKey(SigningKey::random(rand::thread_rng())) + } + + /// Sign a message with this secret key, producing a DER-encoded ECDSA signature. + pub fn sign(&self, msg: &[u8]) -> Vec { + self.0.sign(msg).to_der().as_bytes().to_owned() + } + + /// Encode a secret key into a byte buffer. + pub fn to_bytes(&self) -> Vec { + self.0.to_bytes().to_vec() + } + + /// Decode a secret key from a byte buffer. + pub fn from_bytes(buf: &[u8]) -> Result { + SigningKey::from_bytes(buf) + .map_err(|err| DecodingError::new("failed to parse ecdsa p256 secret key").source(err)) + .map(SecretKey) + } +} + +impl fmt::Debug for SecretKey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "SecretKey") + } +} + +/// An ECDSA public key. +#[derive(Clone, PartialEq, Eq)] +pub struct PublicKey(VerifyingKey); + +impl PublicKey { + /// Verify an ECDSA signature on a message using the public key. + pub fn verify(&self, msg: &[u8], sig: &[u8]) -> bool { + let sig = match Signature::from_der(sig) { + Ok(sig) => sig, + Err(_) => return false, + }; + self.0.verify(msg, &sig).is_ok() + } + + /// Decode a public key from a byte buffer without compression. + pub fn from_bytes(k: &[u8]) -> Result { + let enc_pt = EncodedPoint::from_bytes(k).map_err(|_| { + DecodingError::new("failed to parse ecdsa p256 public key, bad point encoding") + })?; + + VerifyingKey::from_encoded_point(&enc_pt) + .map_err(|err| DecodingError::new("failed to parse ecdsa p256 public key").source(err)) + .map(PublicKey) + } + + /// Encode a public key into a byte buffer without compression. + pub fn to_bytes(&self) -> Vec { + self.0.to_encoded_point(false).as_bytes().to_owned() + } + + /// Encode a public key into a DER encoded byte buffer as defined by SEC1 standard. + pub fn encode_der(&self) -> Vec { + let buf = self.to_bytes(); + Self::add_asn1_header(&buf) + } + + /// Decode a public key into a DER encoded byte buffer as defined by SEC1 standard. + pub fn decode_der(k: &[u8]) -> Result { + let buf = Self::del_asn1_header(k).ok_or_else(|| { + DecodingError::new("failed to parse asn.1 encoded ecdsa p256 public key") + })?; + Self::from_bytes(&buf) + } + + // ecPublicKey (ANSI X9.62 public key type) OID: 1.2.840.10045.2.1 + const EC_PUBLIC_KEY_OID: [u8; 9] = [0x06, 0x07, 0x2a, 0x86, 0x48, 0xce, 0x3d, 0x02, 0x01]; + // secp256r1 OID: 1.2.840.10045.3.1.7 + const SECP_256_R1_OID: [u8; 10] = [0x06, 0x08, 0x2A, 0x86, 0x48, 0xCE, 0x3D, 0x03, 0x01, 0x07]; + + // Add ASN1 header. + fn add_asn1_header(key_buf: &[u8]) -> Vec { + // ASN.1 struct type and length. + let mut asn1_buf = vec![ + 0x30, + 0x00, + 0x30, + (Self::EC_PUBLIC_KEY_OID.len() + Self::SECP_256_R1_OID.len()) as u8, + ]; + // Append OIDs. + asn1_buf.extend_from_slice(&Self::EC_PUBLIC_KEY_OID); + asn1_buf.extend_from_slice(&Self::SECP_256_R1_OID); + // Append key bitstring type and length. + asn1_buf.extend_from_slice(&[0x03, (key_buf.len() + 1) as u8, 0x00]); + // Append key bitstring value. + asn1_buf.extend_from_slice(key_buf); + // Update overall length field. + asn1_buf[1] = (asn1_buf.len() - 2) as u8; + + asn1_buf + } + + // Check and remove ASN.1 header. + fn del_asn1_header(asn1_buf: &[u8]) -> Option<&[u8]> { + let oids_len = Self::EC_PUBLIC_KEY_OID.len() + Self::SECP_256_R1_OID.len(); + let asn1_head = asn1_buf.get(..4)?; + let oids_buf = asn1_buf.get(4..4 + oids_len)?; + let bitstr_head = asn1_buf.get(4 + oids_len..4 + oids_len + 3)?; + + // Sanity check + if asn1_head[0] != 0x30 + || asn1_head[2] != 0x30 + || asn1_head[3] as usize != oids_len + || &oids_buf[..Self::EC_PUBLIC_KEY_OID.len()] != &Self::EC_PUBLIC_KEY_OID + || &oids_buf[Self::EC_PUBLIC_KEY_OID.len()..] != &Self::SECP_256_R1_OID + || bitstr_head[0] != 0x03 + || bitstr_head[2] != 0x00 + { + return None; + } + + let key_len = bitstr_head[1].checked_sub(1)? as usize; + let key_buf = asn1_buf.get(4 + oids_len + 3..4 + oids_len + 3 + key_len as usize)?; + Some(key_buf) + } +} + +impl fmt::Debug for PublicKey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("PublicKey(asn.1 uncompressed): ")?; + for byte in &self.encode_der() { + write!(f, "{:x}", byte)?; + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn sign_verify() { + let pair = Keypair::generate(); + let pk = pair.public(); + + let msg = "hello world".as_bytes(); + let sig = pair.sign(msg); + assert!(pk.verify(msg, &sig)); + + let mut invalid_sig = sig.clone(); + invalid_sig[3..6].copy_from_slice(&[10, 23, 42]); + assert!(!pk.verify(msg, &invalid_sig)); + + let invalid_msg = "h3ll0 w0rld".as_bytes(); + assert!(!pk.verify(invalid_msg, &sig)); + } +} diff --git a/core/src/keys.proto b/core/src/keys.proto index 0a9f1f08ca3..5fbeaf8f6e0 100644 --- a/core/src/keys.proto +++ b/core/src/keys.proto @@ -6,6 +6,7 @@ enum KeyType { RSA = 0; Ed25519 = 1; Secp256k1 = 2; + ECDSA = 3; } message PublicKey { diff --git a/core/src/transport/and_then.rs b/core/src/transport/and_then.rs index cb3d0379064..3f06ad2684f 100644 --- a/core/src/transport/and_then.rs +++ b/core/src/transport/and_then.rs @@ -83,7 +83,7 @@ where role_override: Endpoint::Dialer, }, )), - marker: PhantomPinned, + _marker: PhantomPinned, }; Ok(future) } @@ -105,7 +105,7 @@ where role_override: Endpoint::Listener, }, )), - marker: PhantomPinned, + _marker: PhantomPinned, }; Ok(future) } @@ -160,7 +160,7 @@ where upgrade: AndThenFuture { inner: Either::Left(Box::pin(upgrade)), args: Some((this.fun.clone(), point)), - marker: PhantomPinned, + _marker: PhantomPinned, }, local_addr, remote_addr, @@ -187,7 +187,7 @@ where pub struct AndThenFuture { inner: Either>, Pin>>, args: Option<(TMap, ConnectedPoint)>, - marker: PhantomPinned, + _marker: PhantomPinned, } impl Future for AndThenFuture diff --git a/examples/chat.rs b/examples/chat.rs index bdafde20a19..907b43b1eb4 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -50,7 +50,10 @@ //! The two nodes then connect. use async_std::{io, task}; -use futures::{future, prelude::*}; +use futures::{ + prelude::{stream::StreamExt, *}, + select, +}; use libp2p::{ floodsub::{self, Floodsub, FloodsubEvent}, identity, @@ -58,10 +61,7 @@ use libp2p::{ swarm::SwarmEvent, Multiaddr, NetworkBehaviour, PeerId, Swarm, }; -use std::{ - error::Error, - task::{Context, Poll}, -}; +use std::error::Error; #[async_std::main] async fn main() -> Result<(), Box> { @@ -133,40 +133,34 @@ async fn main() -> Result<(), Box> { } // Read full lines from stdin - let mut stdin = io::BufReader::new(io::stdin()).lines(); + let mut stdin = io::BufReader::new(io::stdin()).lines().fuse(); // Listen on all interfaces and whatever port the OS assigns swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; // Kick it off - task::block_on(future::poll_fn(move |cx: &mut Context<'_>| { - loop { - match stdin.try_poll_next_unpin(cx)? { - Poll::Ready(Some(line)) => swarm - .behaviour_mut() - .floodsub - .publish(floodsub_topic.clone(), line.as_bytes()), - Poll::Ready(None) => panic!("Stdin closed"), - Poll::Pending => break, - } - } - loop { - match swarm.poll_next_unpin(cx) { - Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => { + loop { + select! { + line = stdin.select_next_some() => swarm + .behaviour_mut() + .floodsub + .publish(floodsub_topic.clone(), line.expect("Stdin not to close").as_bytes()), + event = swarm.select_next_some() => match event { + SwarmEvent::NewListenAddr { address, .. } => { println!("Listening on {:?}", address); } - Poll::Ready(Some(SwarmEvent::Behaviour(OutEvent::Floodsub( - FloodsubEvent::Message(message), - )))) => { + SwarmEvent::Behaviour(OutEvent::Floodsub( + FloodsubEvent::Message(message) + )) => { println!( "Received: '{:?}' from {:?}", String::from_utf8_lossy(&message.data), message.source ); } - Poll::Ready(Some(SwarmEvent::Behaviour(OutEvent::Mdns( - MdnsEvent::Discovered(list), - )))) => { + SwarmEvent::Behaviour(OutEvent::Mdns( + MdnsEvent::Discovered(list) + )) => { for (peer, _) in list { swarm .behaviour_mut() @@ -174,9 +168,9 @@ async fn main() -> Result<(), Box> { .add_node_to_partial_view(peer); } } - Poll::Ready(Some(SwarmEvent::Behaviour(OutEvent::Mdns(MdnsEvent::Expired( - list, - ))))) => { + SwarmEvent::Behaviour(OutEvent::Mdns(MdnsEvent::Expired( + list + ))) => { for (peer, _) in list { if !swarm.behaviour_mut().mdns.has_node(&peer) { swarm @@ -185,12 +179,9 @@ async fn main() -> Result<(), Box> { .remove_node_from_partial_view(&peer); } } - } - Poll::Ready(Some(_)) => {} - Poll::Ready(None) => return Poll::Ready(Ok(())), - Poll::Pending => break, + }, + _ => {} } } - Poll::Pending - })) + } } diff --git a/examples/distributed-key-value-store.rs b/examples/distributed-key-value-store.rs index cea3ca2a56b..3f92ebbe571 100644 --- a/examples/distributed-key-value-store.rs +++ b/examples/distributed-key-value-store.rs @@ -41,7 +41,7 @@ //! 4. Close with Ctrl-c. use async_std::{io, task}; -use futures::prelude::*; +use futures::{prelude::*, select}; use libp2p::kad::record::store::MemoryStore; use libp2p::kad::{ record::Key, AddProviderOk, Kademlia, KademliaEvent, PeerRecord, PutRecordOk, QueryResult, @@ -53,10 +53,7 @@ use libp2p::{ swarm::{NetworkBehaviourEventProcess, SwarmEvent}, NetworkBehaviour, PeerId, Swarm, }; -use std::{ - error::Error, - task::{Context, Poll}, -}; +use std::error::Error; #[async_std::main] async fn main() -> Result<(), Box> { @@ -157,35 +154,23 @@ async fn main() -> Result<(), Box> { }; // Read full lines from stdin - let mut stdin = io::BufReader::new(io::stdin()).lines(); + let mut stdin = io::BufReader::new(io::stdin()).lines().fuse(); // Listen on all interfaces and whatever port the OS assigns. swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; // Kick it off. - task::block_on(future::poll_fn(move |cx: &mut Context<'_>| { - loop { - match stdin.try_poll_next_unpin(cx)? { - Poll::Ready(Some(line)) => { - handle_input_line(&mut swarm.behaviour_mut().kademlia, line) - } - Poll::Ready(None) => panic!("Stdin closed"), - Poll::Pending => break, - } - } - loop { - match swarm.poll_next_unpin(cx) { - Poll::Ready(Some(event)) => { - if let SwarmEvent::NewListenAddr { address, .. } = event { - println!("Listening on {:?}", address); - } - } - Poll::Ready(None) => return Poll::Ready(Ok(())), - Poll::Pending => break, + loop { + select! { + line = stdin.select_next_some() => handle_input_line(&mut swarm.behaviour_mut().kademlia, line.expect("Stdin not to close")), + event = swarm.select_next_some() => match event { + SwarmEvent::NewListenAddr { address, .. } => { + println!("Listening in {:?}", address); + }, + _ => {} } } - Poll::Pending - })) + } } fn handle_input_line(kademlia: &mut Kademlia, line: String) { diff --git a/examples/gossipsub-chat.rs b/examples/gossipsub-chat.rs index fdd477bfe4b..976fcafb470 100644 --- a/examples/gossipsub-chat.rs +++ b/examples/gossipsub-chat.rs @@ -46,21 +46,18 @@ //! //! The two nodes should then connect. -use async_std::{io, task}; +use async_std::io; use env_logger::{Builder, Env}; -use futures::prelude::*; +use futures::{prelude::*, select}; use libp2p::gossipsub::MessageId; use libp2p::gossipsub::{ GossipsubEvent, GossipsubMessage, IdentTopic as Topic, MessageAuthenticity, ValidationMode, }; use libp2p::{gossipsub, identity, swarm::SwarmEvent, Multiaddr, PeerId}; use std::collections::hash_map::DefaultHasher; +use std::error::Error; use std::hash::{Hash, Hasher}; use std::time::Duration; -use std::{ - error::Error, - task::{Context, Poll}, -}; #[async_std::main] async fn main() -> Result<(), Box> { @@ -130,44 +127,35 @@ async fn main() -> Result<(), Box> { } // Read full lines from stdin - let mut stdin = io::BufReader::new(io::stdin()).lines(); + let mut stdin = io::BufReader::new(io::stdin()).lines().fuse(); // Kick it off - task::block_on(future::poll_fn(move |cx: &mut Context<'_>| { - loop { - if let Err(e) = match stdin.try_poll_next_unpin(cx)? { - Poll::Ready(Some(line)) => swarm + loop { + select! { + line = stdin.select_next_some() => { + if let Err(e) = swarm .behaviour_mut() - .publish(topic.clone(), line.as_bytes()), - Poll::Ready(None) => panic!("Stdin closed"), - Poll::Pending => break, - } { - println!("Publish error: {:?}", e); - } - } - - loop { - match swarm.poll_next_unpin(cx) { - Poll::Ready(Some(event)) => match event { - SwarmEvent::Behaviour(GossipsubEvent::Message { - propagation_source: peer_id, - message_id: id, - message, - }) => println!( - "Got message: {} with id: {} from peer: {:?}", - String::from_utf8_lossy(&message.data), - id, - peer_id - ), - SwarmEvent::NewListenAddr { address, .. } => { - println!("Listening on {:?}", address); - } - _ => {} - }, - Poll::Ready(None) | Poll::Pending => break, + .publish(topic.clone(), line.expect("Stdin not to close").as_bytes()) + { + println!("Publish error: {:?}", e); + } + }, + event = swarm.select_next_some() => match event { + SwarmEvent::Behaviour(GossipsubEvent::Message { + propagation_source: peer_id, + message_id: id, + message, + }) => println!( + "Got message: {} with id: {} from peer: {:?}", + String::from_utf8_lossy(&message.data), + id, + peer_id + ), + SwarmEvent::NewListenAddr { address, .. } => { + println!("Listening on {:?}", address); + } + _ => {} } } - - Poll::Pending - })) + } } diff --git a/examples/ipfs-private.rs b/examples/ipfs-private.rs index 4b44ad3f40a..fdeed494141 100644 --- a/examples/ipfs-private.rs +++ b/examples/ipfs-private.rs @@ -31,8 +31,8 @@ //! //! You can ping this node, or use pubsub (gossipsub) on the topic "chat". For this //! to work, the ipfs node needs to be configured to use gossipsub. -use async_std::{io, task}; -use futures::{future, prelude::*}; +use async_std::io; +use futures::{prelude::*, select}; use libp2p::{ core::{ either::EitherTransport, muxing::StreamMuxerBox, transport, transport::upgrade::Version, @@ -48,15 +48,7 @@ use libp2p::{ yamux::YamuxConfig, Multiaddr, NetworkBehaviour, PeerId, Swarm, Transport, }; -use std::{ - env, - error::Error, - fs, - path::Path, - str::FromStr, - task::{Context, Poll}, - time::Duration, -}; +use std::{env, error::Error, fs, path::Path, str::FromStr, time::Duration}; /// Builds the transport that serves as a common ground for all connections. pub fn build_transport( @@ -138,7 +130,8 @@ fn parse_legacy_multiaddr(text: &str) -> Result> { Ok(res) } -fn main() -> Result<(), Box> { +#[async_std::main] +async fn main() -> Result<(), Box> { env_logger::init(); let ipfs_path: Box = get_ipfs_path(); @@ -270,36 +263,28 @@ fn main() -> Result<(), Box> { } // Read full lines from stdin - let mut stdin = io::BufReader::new(io::stdin()).lines(); + let mut stdin = io::BufReader::new(io::stdin()).lines().fuse(); // Listen on all interfaces and whatever port the OS assigns swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; // Kick it off - task::block_on(future::poll_fn(move |cx: &mut Context<'_>| { - loop { - if let Err(e) = match stdin.try_poll_next_unpin(cx)? { - Poll::Ready(Some(line)) => swarm + loop { + select! { + line = stdin.select_next_some() => { + if let Err(e) = swarm .behaviour_mut() .gossipsub - .publish(gossipsub_topic.clone(), line.as_bytes()), - Poll::Ready(None) => panic!("Stdin closed"), - Poll::Pending => break, - } { - println!("Publish error: {:?}", e); - } - } - loop { - match swarm.poll_next_unpin(cx) { - Poll::Ready(Some(event)) => { - if let SwarmEvent::NewListenAddr { address, .. } = event { - println!("Listening on {:?}", address); - } + .publish(gossipsub_topic.clone(), line.expect("Stdin not to close").as_bytes()) + { + println!("Publish error: {:?}", e); + } + }, + event = swarm.select_next_some() => { + if let SwarmEvent::NewListenAddr { address, .. } = event { + println!("Listening on {:?}", address); } - Poll::Ready(None) => return Poll::Ready(Ok(())), - Poll::Pending => break, } } - Poll::Pending - })) + } } diff --git a/examples/ping.rs b/examples/ping.rs index 6a4523b4d77..26223459bfa 100644 --- a/examples/ping.rs +++ b/examples/ping.rs @@ -40,19 +40,18 @@ //! The two nodes establish a connection, negotiate the ping protocol //! and begin pinging each other. -use futures::executor::block_on; use futures::prelude::*; use libp2p::swarm::{Swarm, SwarmEvent}; use libp2p::{identity, ping, Multiaddr, PeerId}; use std::error::Error; -use std::task::Poll; -fn main() -> Result<(), Box> { +#[async_std::main] +async fn main() -> Result<(), Box> { let local_key = identity::Keypair::generate_ed25519(); let local_peer_id = PeerId::from(local_key.public()); println!("Local peer id: {:?}", local_peer_id); - let transport = block_on(libp2p::development_transport(local_key))?; + let transport = libp2p::development_transport(local_key).await?; // Create a ping network behaviour. // @@ -75,17 +74,11 @@ fn main() -> Result<(), Box> { println!("Dialed {}", addr) } - block_on(future::poll_fn(move |cx| loop { - match swarm.poll_next_unpin(cx) { - Poll::Ready(Some(event)) => match event { - SwarmEvent::NewListenAddr { address, .. } => println!("Listening on {:?}", address), - SwarmEvent::Behaviour(event) => println!("{:?}", event), - _ => {} - }, - Poll::Ready(None) => return Poll::Ready(()), - Poll::Pending => return Poll::Pending, + loop { + match swarm.select_next_some().await { + SwarmEvent::NewListenAddr { address, .. } => println!("Listening on {:?}", address), + SwarmEvent::Behaviour(event) => println!("{:?}", event), + _ => {} } - })); - - Ok(()) + } } diff --git a/protocols/floodsub/CHANGELOG.md b/protocols/floodsub/CHANGELOG.md index 74c5d3db69b..116dafef018 100644 --- a/protocols/floodsub/CHANGELOG.md +++ b/protocols/floodsub/CHANGELOG.md @@ -4,8 +4,12 @@ - Migrate to Rust edition 2021 (see [PR 2339]). +- Propagate messages only to the target peers and not all connected peers (see [PR2360]). + [PR 2339]: https://github.com/libp2p/rust-libp2p/pull/2339 +[PR 2360]: https://github.com/libp2p/rust-libp2p/pull/2360/ + # 0.32.0 [2021-11-16] - Update dependencies. diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index 73b8edb97fa..b2093c763d6 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -253,6 +253,12 @@ impl Floodsub { // Send to peers we know are subscribed to the topic. for (peer_id, sub_topic) in self.connected_peers.iter() { + // Peer must be in a communication list. + if !self.target_peers.contains(peer_id) { + continue; + } + + // Peer must be subscribed for the topic. if !sub_topic .iter() .any(|t| message.topics.iter().any(|u| t == u)) @@ -402,6 +408,12 @@ impl NetworkBehaviour for Floodsub { continue; } + // Peer must be in a communication list. + if !self.target_peers.contains(peer_id) { + continue; + } + + // Peer must be subscribed for the topic. if !subscr_topics .iter() .any(|t| message.topics.iter().any(|u| t == u)) diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 270cc6474f3..360dd980325 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -21,7 +21,7 @@ rand = "0.7.3" asynchronous-codec = "0.6" unsigned-varint = { version = "0.7.0", features = ["asynchronous_codec"] } log = "0.4.11" -sha2 = "0.9.1" +sha2 = "0.10.0" base64 = "0.13.0" smallvec = "1.6.1" prost = "0.9" diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 9daddc5d7af..d9373d89461 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -147,6 +147,7 @@ pub enum GossipsubEvent { /// A data structure for storing configuration for publishing messages. See [`MessageAuthenticity`] /// for further details. +#[allow(clippy::large_enum_variant)] enum PublishConfig { Signing { keypair: Keypair, diff --git a/protocols/identify/Cargo.toml b/protocols/identify/Cargo.toml index a3205f35b2d..9aaa210f4a8 100644 --- a/protocols/identify/Cargo.toml +++ b/protocols/identify/Cargo.toml @@ -16,7 +16,7 @@ futures-timer = "3.0.2" libp2p-core = { version = "0.31.0", path = "../../core", default-features = false } libp2p-swarm = { version = "0.33.0", path = "../../swarm" } log = "0.4.1" -lru = "0.6" +lru = "0.7" prost = "0.9" smallvec = "1.6.1" diff --git a/protocols/identify/src/identify.rs b/protocols/identify/src/identify.rs index c7dab4fdf22..2e644502014 100644 --- a/protocols/identify/src/identify.rs +++ b/protocols/identify/src/identify.rs @@ -459,6 +459,7 @@ impl NetworkBehaviour for Identify { } /// Event emitted by the `Identify` behaviour. +#[allow(clippy::large_enum_variant)] #[derive(Debug)] pub enum IdentifyEvent { /// Identification information has been received from a peer. diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index 2b5a126be33..76bab545e7f 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -22,7 +22,7 @@ libp2p-core = { version = "0.31.0", path = "../../core", default-features = fals libp2p-swarm = { version = "0.33.0", path = "../../swarm" } prost = "0.9" rand = "0.7.2" -sha2 = "0.9.1" +sha2 = "0.10.0" smallvec = "1.6.1" uint = "0.9" unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] } diff --git a/protocols/mdns/CHANGELOG.md b/protocols/mdns/CHANGELOG.md index 10d9d0d58e7..74e6066afe6 100644 --- a/protocols/mdns/CHANGELOG.md +++ b/protocols/mdns/CHANGELOG.md @@ -11,10 +11,14 @@ - Migrate to Rust edition 2021 (see [PR 2339]). +- Fix generation of peer expiration event and listen on specified IP version (see [PR 2359]). + [PR 2339]: https://github.com/libp2p/rust-libp2p/pull/2339 [PR 2311]: https://github.com/libp2p/rust-libp2p/pull/2311/ +[PR 2359]: https://github.com/libp2p/rust-libp2p/pull/2359 + # 0.33.0 [2021-11-16] - Update dependencies. diff --git a/protocols/mdns/Cargo.toml b/protocols/mdns/Cargo.toml index 1f6fde5ede0..42c666f3328 100644 --- a/protocols/mdns/Cargo.toml +++ b/protocols/mdns/Cargo.toml @@ -28,4 +28,4 @@ void = "1.0.2" [dev-dependencies] async-std = { version = "1.9.0", features = ["attributes"] } libp2p = { path = "../.." } -tokio = { version = "1.2.0", default-features = false, features = ["macros", "rt", "rt-multi-thread"] } +tokio = { version = "1.2.0", default-features = false, features = ["macros", "rt", "rt-multi-thread", "time"] } diff --git a/protocols/mdns/src/behaviour.rs b/protocols/mdns/src/behaviour.rs index 156c805a4d7..ee32b850276 100644 --- a/protocols/mdns/src/behaviour.rs +++ b/protocols/mdns/src/behaviour.rs @@ -144,12 +144,12 @@ impl Mdns { } }; let send_socket = { - let addrs = [ - SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0), - SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0), - ]; + let addr = match config.multicast_addr { + IpAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0), + IpAddr::V6(_) => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0), + }; - let socket = std::net::UdpSocket::bind(&addrs[..])?; + let socket = std::net::UdpSocket::bind(addr)?; Async::new(socket)? }; let if_watch = if_watch::IfWatcher::new().await?; @@ -406,7 +406,7 @@ impl NetworkBehaviour for Mdns { while let Some(pos) = self .discovered_nodes .iter() - .position(|(_, _, exp)| *exp < now) + .position(|(_, _, exp)| *exp <= now) { let (peer_id, addr, _) = self.discovered_nodes.remove(pos); expired.push((peer_id, addr)); diff --git a/protocols/mdns/tests/smoke.rs b/protocols/mdns/tests/smoke.rs index 3d374dd4563..707ad9f39ab 100644 --- a/protocols/mdns/tests/smoke.rs +++ b/protocols/mdns/tests/smoke.rs @@ -26,6 +26,7 @@ use libp2p::{ PeerId, }; use std::error::Error; +use std::time::Duration; async fn create_swarm(config: MdnsConfig) -> Result, Box> { let id_keys = identity::Keypair::generate_ed25519(); @@ -37,7 +38,7 @@ async fn create_swarm(config: MdnsConfig) -> Result, Box> Ok(swarm) } -async fn run_test(config: MdnsConfig) -> Result<(), Box> { +async fn run_discovery_test(config: MdnsConfig) -> Result<(), Box> { let mut a = create_swarm(config.clone()).await?; let mut b = create_swarm(config).await?; let mut discovered_a = false; @@ -78,24 +79,109 @@ async fn run_test(config: MdnsConfig) -> Result<(), Box> { #[async_std::test] async fn test_discovery_async_std_ipv4() -> Result<(), Box> { - run_test(MdnsConfig::default()).await + run_discovery_test(MdnsConfig::default()).await } #[async_std::test] async fn test_discovery_async_std_ipv6() -> Result<(), Box> { let mut config = MdnsConfig::default(); config.multicast_addr = *IPV6_MDNS_MULTICAST_ADDRESS; - run_test(MdnsConfig::default()).await + run_discovery_test(config).await } #[tokio::test] async fn test_discovery_tokio_ipv4() -> Result<(), Box> { - run_test(MdnsConfig::default()).await + run_discovery_test(MdnsConfig::default()).await } #[tokio::test] async fn test_discovery_tokio_ipv6() -> Result<(), Box> { let mut config = MdnsConfig::default(); config.multicast_addr = *IPV6_MDNS_MULTICAST_ADDRESS; - run_test(MdnsConfig::default()).await + run_discovery_test(config).await +} + +async fn run_peer_expiration_test(config: MdnsConfig) -> Result<(), Box> { + let mut a = create_swarm(config.clone()).await?; + let mut b = create_swarm(config).await?; + + loop { + futures::select! { + ev = a.select_next_some() => match ev { + SwarmEvent::Behaviour(MdnsEvent::Expired(peers)) => { + for (peer, _addr) in peers { + if peer == *b.local_peer_id() { + return Ok(()); + } + } + } + _ => {} + }, + ev = b.select_next_some() => match ev { + SwarmEvent::Behaviour(MdnsEvent::Expired(peers)) => { + for (peer, _addr) in peers { + if peer == *a.local_peer_id() { + return Ok(()); + } + } + } + _ => {} + } + + } + } +} + +#[async_std::test] +async fn test_expired_async_std_ipv4() -> Result<(), Box> { + let config = MdnsConfig { + ttl: Duration::from_millis(500), + query_interval: Duration::from_secs(1), + ..Default::default() + }; + + async_std::future::timeout(Duration::from_secs(6), run_peer_expiration_test(config)) + .await + .map(|_| ()) + .map_err(|e| Box::new(e) as Box) +} + +#[async_std::test] +async fn test_expired_async_std_ipv6() -> Result<(), Box> { + let config = MdnsConfig { + ttl: Duration::from_millis(500), + query_interval: Duration::from_secs(1), + multicast_addr: *IPV6_MDNS_MULTICAST_ADDRESS, + }; + + async_std::future::timeout(Duration::from_secs(6), run_peer_expiration_test(config)) + .await + .map(|_| ()) + .map_err(|e| Box::new(e) as Box) +} + +#[tokio::test] +async fn test_expired_tokio_ipv4() -> Result<(), Box> { + let config = MdnsConfig { + ttl: Duration::from_millis(500), + query_interval: Duration::from_secs(1), + ..Default::default() + }; + + tokio::time::timeout(Duration::from_secs(6), run_peer_expiration_test(config)) + .await + .unwrap() +} + +#[tokio::test] +async fn test_expired_tokio_ipv6() -> Result<(), Box> { + let config = MdnsConfig { + ttl: Duration::from_millis(500), + query_interval: Duration::from_secs(1), + multicast_addr: *IPV6_MDNS_MULTICAST_ADDRESS, + }; + + tokio::time::timeout(Duration::from_secs(6), run_peer_expiration_test(config)) + .await + .unwrap() } diff --git a/protocols/rendezvous/Cargo.toml b/protocols/rendezvous/Cargo.toml index 42b4ae8dd2d..f2eacc4643a 100644 --- a/protocols/rendezvous/Cargo.toml +++ b/protocols/rendezvous/Cargo.toml @@ -21,7 +21,7 @@ futures = { version = "0.3", default-features = false, features = ["std"] } thiserror = "1" unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] } bimap = "0.6.1" -sha2 = "0.9" +sha2 = "0.10" rand = "0.8" futures-timer = "3.0.2" instant = "0.1.11" diff --git a/protocols/rendezvous/src/codec.rs b/protocols/rendezvous/src/codec.rs index d050ff8ca9a..d8d54d91177 100644 --- a/protocols/rendezvous/src/codec.rs +++ b/protocols/rendezvous/src/codec.rs @@ -28,6 +28,7 @@ use unsigned_varint::codec::UviBytes; pub type Ttl = u64; +#[allow(clippy::large_enum_variant)] #[derive(Debug, Clone)] pub enum Message { Register(NewRegistration), diff --git a/protocols/rendezvous/src/handler.rs b/protocols/rendezvous/src/handler.rs index b4883825e25..d07bf4d248f 100644 --- a/protocols/rendezvous/src/handler.rs +++ b/protocols/rendezvous/src/handler.rs @@ -28,6 +28,7 @@ pub mod inbound; pub mod outbound; /// Errors that can occur while interacting with a substream. +#[allow(clippy::large_enum_variant)] #[derive(Debug, thiserror::Error)] pub enum Error { #[error("Reading message {0:?} at this stage is a protocol violation")] diff --git a/protocols/rendezvous/src/handler/inbound.rs b/protocols/rendezvous/src/handler/inbound.rs index 8a18f366c68..d4452b5758f 100644 --- a/protocols/rendezvous/src/handler/inbound.rs +++ b/protocols/rendezvous/src/handler/inbound.rs @@ -54,6 +54,7 @@ impl fmt::Debug for Stream { } } +#[allow(clippy::large_enum_variant)] #[derive(Debug, Clone)] pub enum OutEvent { RegistrationRequested(NewRegistration), diff --git a/protocols/rendezvous/src/handler/outbound.rs b/protocols/rendezvous/src/handler/outbound.rs index ab06040ca19..c1356ee29d0 100644 --- a/protocols/rendezvous/src/handler/outbound.rs +++ b/protocols/rendezvous/src/handler/outbound.rs @@ -120,6 +120,7 @@ pub enum OutEvent { }, } +#[allow(clippy::large_enum_variant)] #[derive(Debug)] pub enum OpenInfo { RegisterRequest(NewRegistration), diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 2f29dc3cdaf..af6b601eae0 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -231,8 +231,6 @@ impl std::error::Error for InboundFailure {} /// See [`RequestResponse::send_response`]. #[derive(Debug)] pub struct ResponseChannel { - request_id: RequestId, - peer: PeerId, sender: oneshot::Sender, } @@ -593,6 +591,29 @@ where addresses } + fn inject_address_change( + &mut self, + peer: &PeerId, + conn: &ConnectionId, + _old: &ConnectedPoint, + new: &ConnectedPoint, + ) { + let new_address = match new { + ConnectedPoint::Dialer { address, .. } => Some(address.clone()), + ConnectedPoint::Listener { .. } => None, + }; + let connections = self + .connected + .get_mut(peer) + .expect("Address change can only happen on an established connection."); + + let connection = connections + .iter_mut() + .find(|c| &c.id == conn) + .expect("Address change can only happen on an established connection."); + connection.address = new_address; + } + fn inject_connected(&mut self, peer: &PeerId) { if let Some(pending) = self.pending_outbound_requests.remove(peer) { for request in pending { @@ -727,11 +748,7 @@ where request, sender, } => { - let channel = ResponseChannel { - request_id, - peer, - sender, - }; + let channel = ResponseChannel { sender }; let message = RequestResponseMessage::Request { request_id, request, diff --git a/src/tutorial.rs b/src/tutorial.rs index 2f76c5c236a..bbc7f50d59e 100644 --- a/src/tutorial.rs +++ b/src/tutorial.rs @@ -39,7 +39,8 @@ //! 1. Creating a new crate: `cargo init rust-libp2p-tutorial` //! //! 2. Adding `libp2p` as well as `futures` as a dependency in the -//! `Cargo.toml` file: +//! `Cargo.toml` file. We will also include `async-std` with the +//! "attributes" feature to allow for an `async main`: //! //! ```yaml //! [package] @@ -52,13 +53,15 @@ //! [dependencies] //! libp2p = "" //! futures = "" +//! async-std = { version = "", features = ["attributes"] } //! ``` //! //! ## Network identity //! //! With all the scaffolding in place, we can dive into the libp2p specifics. At -//! first we need to create a network identity for our local node in `fn -//! main()`. Identities in libp2p are handled via a public and private key pair. +//! first we need to create a network identity for our local node in `async fn +//! main()`, annotated with an attribute to allow `main` to be `async`. +//! Identities in libp2p are handled via a public and private key pair. //! Nodes identify each other via their [`PeerId`](crate::PeerId) which is //! derived from the public key. //! @@ -66,7 +69,8 @@ //! use libp2p::{identity, PeerId}; //! use std::error::Error; //! -//! fn main() -> Result<(), Box> { +//! #[async_std::main] +//! async fn main() -> Result<(), Box> { //! let local_key = identity::Keypair::generate_ed25519(); //! let local_peer_id = PeerId::from(local_key.public()); //! println!("Local peer id: {:?}", local_peer_id); @@ -98,16 +102,16 @@ //! [`crate::core::muxing`] and [`yamux`](crate::yamux). //! //! ```rust -//! use futures::executor::block_on; //! use libp2p::{identity, PeerId}; //! use std::error::Error; //! -//! fn main() -> Result<(), Box> { +//! #[async_std::main] +//! async fn main() -> Result<(), Box> { //! let local_key = identity::Keypair::generate_ed25519(); //! let local_peer_id = PeerId::from(local_key.public()); //! println!("Local peer id: {:?}", local_peer_id); //! -//! let transport = block_on(libp2p::development_transport(local_key))?; +//! let transport = libp2p::development_transport(local_key).await?; //! //! Ok(()) //! } @@ -138,17 +142,17 @@ //! [`Ping`](crate::ping::Ping) [`NetworkBehaviour`] at the end: //! //! ```rust -//! use futures::executor::block_on; //! use libp2p::{identity, PeerId}; //! use libp2p::ping::{Ping, PingConfig}; //! use std::error::Error; //! -//! fn main() -> Result<(), Box> { +//! #[async_std::main] +//! async fn main() -> Result<(), Box> { //! let local_key = identity::Keypair::generate_ed25519(); //! let local_peer_id = PeerId::from(local_key.public()); //! println!("Local peer id: {:?}", local_peer_id); //! -//! let transport = block_on(libp2p::development_transport(local_key))?; +//! let transport = libp2p::development_transport(local_key).await?; //! //! // Create a ping network behaviour. //! // @@ -171,18 +175,18 @@ //! [`Transport`] to the [`NetworkBehaviour`]. //! //! ```rust -//! use futures::executor::block_on; //! use libp2p::{identity, PeerId}; //! use libp2p::ping::{Ping, PingConfig}; //! use libp2p::swarm::Swarm; //! use std::error::Error; //! -//! fn main() -> Result<(), Box> { +//! #[async_std::main] +//! async fn main() -> Result<(), Box> { //! let local_key = identity::Keypair::generate_ed25519(); //! let local_peer_id = PeerId::from(local_key.public()); //! println!("Local peer id: {:?}", local_peer_id); //! -//! let transport = block_on(libp2p::development_transport(local_key))?; +//! let transport = libp2p::development_transport(local_key).await?; //! //! // Create a ping network behaviour. //! // @@ -222,18 +226,18 @@ //! remote peer. //! //! ```rust -//! use futures::executor::block_on; //! use libp2p::{identity, Multiaddr, PeerId}; //! use libp2p::ping::{Ping, PingConfig}; //! use libp2p::swarm::{Swarm, dial_opts::DialOpts}; //! use std::error::Error; //! -//! fn main() -> Result<(), Box> { +//! #[async_std::main] +//! async fn main() -> Result<(), Box> { //! let local_key = identity::Keypair::generate_ed25519(); //! let local_peer_id = PeerId::from(local_key.public()); //! println!("Local peer id: {:?}", local_peer_id); //! -//! let transport = block_on(libp2p::development_transport(local_key))?; +//! let transport = libp2p::development_transport(local_key).await?; //! //! // Create a ping network behaviour. //! // @@ -267,20 +271,19 @@ //! outgoing connection in case we specify an address on the CLI. //! //! ```no_run -//! use futures::executor::block_on; //! use futures::prelude::*; //! use libp2p::ping::{Ping, PingConfig}; //! use libp2p::swarm::{Swarm, SwarmEvent, dial_opts::DialOpts}; //! use libp2p::{identity, Multiaddr, PeerId}; //! use std::error::Error; -//! use std::task::Poll; //! -//! fn main() -> Result<(), Box> { +//! #[async_std::main] +//! async fn main() -> Result<(), Box> { //! let local_key = identity::Keypair::generate_ed25519(); //! let local_peer_id = PeerId::from(local_key.public()); //! println!("Local peer id: {:?}", local_peer_id); //! -//! let transport = block_on(libp2p::development_transport(local_key))?; +//! let transport = libp2p::development_transport(local_key).await?; //! //! // Create a ping network behaviour. //! // @@ -303,19 +306,14 @@ //! println!("Dialed {}", addr) //! } //! -//! block_on(future::poll_fn(move |cx| loop { -//! match swarm.poll_next_unpin(cx) { -//! Poll::Ready(Some(event)) => match event { -//! SwarmEvent::NewListenAddr { address, .. } => println!("Listening on {:?}", address), -//! SwarmEvent::Behaviour(event) => println!("{:?}", event), -//! _ => {} -//! }, -//! Poll::Ready(None) => return Poll::Ready(()), -//! Poll::Pending => return Poll::Pending +//! loop { +//! match swarm.select_next_some().await { +//! SwarmEvent::NewListenAddr { address, .. } => println!("Listening on {:?}", address), +//! SwarmEvent::Behaviour(event) => println!("{:?}", event), +//! _ => {} //! } -//! })); +//! } //! -//! Ok(()) //! } //! ``` //! diff --git a/swarm-derive/Cargo.toml b/swarm-derive/Cargo.toml index e562859b522..c53240ada0d 100644 --- a/swarm-derive/Cargo.toml +++ b/swarm-derive/Cargo.toml @@ -19,4 +19,5 @@ quote = "1.0" [dev-dependencies] libp2p = { path = "../" } +either = "1.6.0" futures = "0.3.1" diff --git a/swarm-derive/tests/test.rs b/swarm-derive/tests/test.rs index 829c9b71e34..c5182b0481a 100644 --- a/swarm-derive/tests/test.rs +++ b/swarm-derive/tests/test.rs @@ -324,3 +324,98 @@ fn event_process_false() { }; } } + +#[test] +fn with_toggle() { + use libp2p::swarm::behaviour::toggle::Toggle; + + #[allow(dead_code)] + #[derive(NetworkBehaviour)] + #[behaviour(event_process = true)] + struct Foo { + identify: libp2p::identify::Identify, + ping: Toggle, + } + + impl libp2p::swarm::NetworkBehaviourEventProcess for Foo { + fn inject_event(&mut self, _: libp2p::identify::IdentifyEvent) {} + } + + impl libp2p::swarm::NetworkBehaviourEventProcess for Foo { + fn inject_event(&mut self, _: libp2p::ping::PingEvent) {} + } + + #[allow(dead_code)] + fn foo() { + require_net_behaviour::(); + } +} + +#[test] +fn with_either() { + use either::Either; + + #[allow(dead_code)] + #[derive(NetworkBehaviour)] + #[behaviour(event_process = true)] + struct Foo { + kad: libp2p::kad::Kademlia, + ping_or_identify: Either, + } + + impl libp2p::swarm::NetworkBehaviourEventProcess for Foo { + fn inject_event(&mut self, _: libp2p::kad::KademliaEvent) {} + } + + impl + libp2p::swarm::NetworkBehaviourEventProcess< + Either, + > for Foo + { + fn inject_event( + &mut self, + _: Either, + ) { + } + } + + #[allow(dead_code)] + fn foo() { + require_net_behaviour::(); + } +} + +#[test] +fn no_event_with_either() { + use either::Either; + + enum BehaviourOutEvent { + Kad(libp2p::kad::KademliaEvent), + PingOrIdentify(Either), + } + + #[allow(dead_code)] + #[derive(NetworkBehaviour)] + #[behaviour(out_event = "BehaviourOutEvent", event_process = false)] + struct Foo { + kad: libp2p::kad::Kademlia, + ping_or_identify: Either, + } + + impl From for BehaviourOutEvent { + fn from(event: libp2p::kad::KademliaEvent) -> Self { + BehaviourOutEvent::Kad(event) + } + } + + impl From> for BehaviourOutEvent { + fn from(event: Either) -> Self { + BehaviourOutEvent::PingOrIdentify(event) + } + } + + #[allow(dead_code)] + fn foo() { + require_net_behaviour::(); + } +} diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 45b7f2eecd8..49f42931d79 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -6,8 +6,20 @@ - Migrate to Rust edition 2021 (see [PR 2339]). +- Update `Connection::address` on `inject_address_change` (see [PR 2362]). + +- Move `swarm::Toggle` to `swarm::behaviour::Toggle` (see [PR 2375]). + +- Add `Swarm::connected_peers` (see [PR 2378]). + +- Implement `swarm::NetworkBehaviour` on `either::Either` (see [PR 2370]). + [PR 2339]: https://github.com/libp2p/rust-libp2p/pull/2339 [PR 2350]: https://github.com/libp2p/rust-libp2p/pull/2350 +[PR 2362]: https://github.com/libp2p/rust-libp2p/pull/2362 +[PR 2370]: https://github.com/libp2p/rust-libp2p/pull/2370 +[PR 2375]: https://github.com/libp2p/rust-libp2p/pull/2375 +[PR 2378]: https://github.com/libp2p/rust-libp2p/pull/2378 # 0.32.0 [2021-11-16] diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index dfb9161496e..e167c53d428 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -22,6 +22,6 @@ futures-timer = "3.0.2" instant = "0.1.11" [dev-dependencies] -libp2p = { path = "../", default-features = false, features = ["yamux", "plaintext"] } +libp2p = { path = "../", default-features = false, features = ["identify", "ping", "plaintext", "yamux"] } quickcheck = "0.9.0" rand = "0.7.2" diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index f50b0250f6f..269bddf3bad 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -18,6 +18,9 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +pub mod either; +pub mod toggle; + use crate::dial_opts::DialOpts; use crate::protocols_handler::{IntoProtocolsHandler, ProtocolsHandler}; use crate::{AddressRecord, AddressScore, DialError}; @@ -31,18 +34,48 @@ use std::{task::Context, task::Poll}; type THandlerInEvent = <::Handler as ProtocolsHandler>::InEvent; -/// A behaviour for the network. Allows customizing the swarm. +/// A [`NetworkBehaviour`] defines the behaviour of the local node on the network. /// -/// This trait has been designed to be composable. Multiple implementations can be combined into -/// one that handles all the behaviours at once. +/// In contrast to [`Transport`](libp2p_core::Transport) which defines **how** to send bytes on the +/// network, [`NetworkBehaviour`] defines **what** bytes to send and **to whom**. /// -/// # Deriving `NetworkBehaviour` +/// Each protocol (e.g. `libp2p-ping`, `libp2p-identify` or `libp2p-kad`) implements +/// [`NetworkBehaviour`]. Multiple implementations of [`NetworkBehaviour`] can be composed into a +/// hierarchy of [`NetworkBehaviour`]s where parent implementations delegate to child +/// implementations. Finally the root of the [`NetworkBehaviour`] hierarchy is passed to +/// [`Swarm`](crate::Swarm) where it can then control the behaviour of the local node on a libp2p +/// network. /// -/// Crate users can implement this trait by using the the `#[derive(NetworkBehaviour)]` -/// proc macro re-exported by the `libp2p` crate. The macro generates a delegating `trait` -/// implementation for the `struct`, which delegates method calls to all struct members. +/// # Hierarchy of [`NetworkBehaviour`] +/// +/// To compose multiple [`NetworkBehaviour`] implementations into a single [`NetworkBehaviour`] +/// implementation, potentially building a multi-level hierarchy of [`NetworkBehaviour`]s, one can +/// use one of the [`NetworkBehaviour`] combinators, and/or use the [`NetworkBehaviour`] derive +/// macro. +/// +/// ## Combinators /// -/// Struct members that don't implement [`NetworkBehaviour`] must be annotated with `#[behaviour(ignore)]`. +/// [`NetworkBehaviour`] combinators wrap one or more [`NetworkBehaviour`] implementations and +/// implement [`NetworkBehaviour`] themselves. Example is the +/// [`Toggle`](crate::behaviour::toggle::Toggle) [`NetworkBehaviour`]. +/// +/// ``` rust +/// # use libp2p_swarm::DummyBehaviour; +/// # use libp2p_swarm::behaviour::toggle::Toggle; +/// let my_behaviour = DummyBehaviour::default(); +/// let my_toggled_behaviour = Toggle::from(Some(my_behaviour)); +/// ``` +/// +/// ## Derive Macro +/// +/// One can derive [`NetworkBehaviour`] for a custom `struct` via the `#[derive(NetworkBehaviour)]` +/// proc macro re-exported by the `libp2p` crate. The macro generates a delegating `trait` +/// implementation for the custom `struct`. Each [`NetworkBehaviour`] trait method is simply +/// delegated to each `struct` member in the order the `struct` is defined. For example for +/// [`NetworkBehaviour::poll`] it will first poll the first `struct` member until it returns +/// [`Poll::Pending`] before moving on to later members. For [`NetworkBehaviour::addresses_of_peer`] +/// it will delegate to each `struct` member and return a concatenated array of all addresses +/// returned by the struct members. /// /// By default the derive sets the [`NetworkBehaviour::OutEvent`] as `()` but this can be overridden /// with `#[behaviour(out_event = "AnotherType")]`. @@ -50,7 +83,72 @@ type THandlerInEvent = /// When setting a custom `out_event` users have to implement [`From`] converting from each of the /// event types generated by the struct members to the custom `out_event`. /// -/// Alternatively, users can specify `#[behaviour(event_process = true)]`. Events generated by the +/// ``` rust +/// # use libp2p::identify::{Identify, IdentifyEvent}; +/// # use libp2p::ping::{Ping, PingEvent}; +/// # use libp2p::NetworkBehaviour; +/// #[derive(NetworkBehaviour)] +/// #[behaviour(out_event = "Event")] +/// struct MyBehaviour { +/// identify: Identify, +/// ping: Ping, +/// } +/// +/// enum Event { +/// Identify(IdentifyEvent), +/// Ping(PingEvent), +/// } +/// +/// impl From for Event { +/// fn from(event: IdentifyEvent) -> Self { +/// Self::Identify(event) +/// } +/// } +/// +/// impl From for Event { +/// fn from(event: PingEvent) -> Self { +/// Self::Ping(event) +/// } +/// } +/// ``` +/// +/// Struct members that don't implement [`NetworkBehaviour`] must be annotated with +/// `#[behaviour(ignore)]`. +/// +/// ``` rust +/// # use libp2p::identify::{Identify, IdentifyEvent}; +/// # use libp2p::ping::{Ping, PingEvent}; +/// # use libp2p::NetworkBehaviour; +/// #[derive(NetworkBehaviour)] +/// #[behaviour(out_event = "Event")] +/// struct MyBehaviour { +/// identify: Identify, +/// ping: Ping, +/// +/// #[behaviour(ignore)] +/// some_string: String, +/// } +/// # +/// # enum Event { +/// # Identify(IdentifyEvent), +/// # Ping(PingEvent), +/// # } +/// # +/// # impl From for Event { +/// # fn from(event: IdentifyEvent) -> Self { +/// # Self::Identify(event) +/// # } +/// # } +/// # +/// # impl From for Event { +/// # fn from(event: PingEvent) -> Self { +/// # Self::Ping(event) +/// # } +/// # } +/// ``` +/// +/// For users that need access to the root [`NetworkBehaviour`] implementation while processing +/// emitted events, one can specify `#[behaviour(event_process = true)]`. Events generated by the /// struct members are delegated to [`NetworkBehaviourEventProcess`] implementations. Those must be /// provided by the user on the type that [`NetworkBehaviour`] is derived on. /// @@ -621,6 +719,50 @@ where } } +impl NetworkBehaviourAction +where + THandlerOld: IntoProtocolsHandler, + ::Handler: ProtocolsHandler, +{ + /// Map the handler and handler event. + pub fn map_handler_and_in( + self, + f_handler: impl FnOnce(THandlerOld) -> THandlerNew, + f_in_event: impl FnOnce(TInEventOld) -> TInEventNew, + ) -> NetworkBehaviourAction + where + THandlerNew: IntoProtocolsHandler, + ::Handler: ProtocolsHandler, + { + match self { + NetworkBehaviourAction::GenerateEvent(e) => NetworkBehaviourAction::GenerateEvent(e), + NetworkBehaviourAction::Dial { opts, handler } => NetworkBehaviourAction::Dial { + opts, + handler: f_handler(handler), + }, + NetworkBehaviourAction::NotifyHandler { + peer_id, + handler, + event, + } => NetworkBehaviourAction::NotifyHandler { + peer_id, + handler, + event: f_in_event(event), + }, + NetworkBehaviourAction::ReportObservedAddr { address, score } => { + NetworkBehaviourAction::ReportObservedAddr { address, score } + } + NetworkBehaviourAction::CloseConnection { + peer_id, + connection, + } => NetworkBehaviourAction::CloseConnection { + peer_id, + connection, + }, + } + } +} + /// The options w.r.t. which connection handler to notify of an event. #[derive(Debug, Clone)] pub enum NotifyHandler { diff --git a/swarm/src/behaviour/either.rs b/swarm/src/behaviour/either.rs new file mode 100644 index 00000000000..3dd6d28a3d2 --- /dev/null +++ b/swarm/src/behaviour/either.rs @@ -0,0 +1,248 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::protocols_handler::{either::IntoEitherHandler, IntoProtocolsHandler, ProtocolsHandler}; +use crate::{ + DialError, NetworkBehaviour, NetworkBehaviourAction, NetworkBehaviourEventProcess, + PollParameters, +}; +use either::Either; +use libp2p_core::{ + connection::{ConnectionId, ListenerId}, + ConnectedPoint, Multiaddr, PeerId, +}; +use std::{task::Context, task::Poll}; + +/// Implementation of [`NetworkBehaviour`] that can be either of two implementations. +impl NetworkBehaviour for Either +where + L: NetworkBehaviour, + R: NetworkBehaviour, +{ + type ProtocolsHandler = IntoEitherHandler; + type OutEvent = Either; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + match self { + Either::Left(a) => IntoEitherHandler::Left(a.new_handler()), + Either::Right(b) => IntoEitherHandler::Right(b.new_handler()), + } + } + + fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { + match self { + Either::Left(a) => a.addresses_of_peer(peer_id), + Either::Right(b) => b.addresses_of_peer(peer_id), + } + } + + fn inject_connected(&mut self, peer_id: &PeerId) { + match self { + Either::Left(a) => a.inject_connected(peer_id), + Either::Right(b) => b.inject_connected(peer_id), + }; + } + + fn inject_disconnected(&mut self, peer_id: &PeerId) { + match self { + Either::Left(a) => a.inject_disconnected(peer_id), + Either::Right(b) => b.inject_disconnected(peer_id), + } + } + + fn inject_connection_established( + &mut self, + peer_id: &PeerId, + connection: &ConnectionId, + endpoint: &ConnectedPoint, + errors: Option<&Vec>, + ) { + match self { + Either::Left(a) => { + a.inject_connection_established(peer_id, connection, endpoint, errors) + } + Either::Right(b) => { + b.inject_connection_established(peer_id, connection, endpoint, errors) + } + } + } + + fn inject_connection_closed( + &mut self, + peer_id: &PeerId, + connection: &ConnectionId, + endpoint: &ConnectedPoint, + handler: ::Handler, + ) { + match (self, handler) { + (Either::Left(behaviour), Either::Left(handler)) => { + behaviour.inject_connection_closed(peer_id, connection, endpoint, handler) + } + (Either::Right(behaviour), Either::Right(handler)) => { + behaviour.inject_connection_closed(peer_id, connection, endpoint, handler) + } + _ => unreachable!(), + } + } + + fn inject_address_change( + &mut self, + peer_id: &PeerId, + connection: &ConnectionId, + old: &ConnectedPoint, + new: &ConnectedPoint, + ) { + match self { + Either::Left(a) => a.inject_address_change(peer_id, connection, old, new), + Either::Right(b) => b.inject_address_change(peer_id, connection, old, new), + } + } + + fn inject_event( + &mut self, + peer_id: PeerId, + connection: ConnectionId, + event: <::Handler as ProtocolsHandler>::OutEvent, + ) { + match (self, event) { + (Either::Left(behaviour), Either::Left(event)) => { + behaviour.inject_event(peer_id, connection, event) + } + (Either::Right(behaviour), Either::Right(event)) => { + behaviour.inject_event(peer_id, connection, event) + } + _ => unreachable!(), + } + } + + fn inject_dial_failure( + &mut self, + peer_id: Option, + handler: Self::ProtocolsHandler, + error: &DialError, + ) { + match (self, handler) { + (Either::Left(behaviour), IntoEitherHandler::Left(handler)) => { + behaviour.inject_dial_failure(peer_id, handler, error) + } + (Either::Right(behaviour), IntoEitherHandler::Right(handler)) => { + behaviour.inject_dial_failure(peer_id, handler, error) + } + _ => unreachable!(), + } + } + + fn inject_listen_failure( + &mut self, + local_addr: &Multiaddr, + send_back_addr: &Multiaddr, + handler: Self::ProtocolsHandler, + ) { + match (self, handler) { + (Either::Left(behaviour), IntoEitherHandler::Left(handler)) => { + behaviour.inject_listen_failure(local_addr, send_back_addr, handler) + } + (Either::Right(behaviour), IntoEitherHandler::Right(handler)) => { + behaviour.inject_listen_failure(local_addr, send_back_addr, handler) + } + _ => unreachable!(), + } + } + + fn inject_new_listener(&mut self, id: ListenerId) { + match self { + Either::Left(a) => a.inject_new_listener(id), + Either::Right(b) => b.inject_new_listener(id), + } + } + + fn inject_new_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) { + match self { + Either::Left(a) => a.inject_new_listen_addr(id, addr), + Either::Right(b) => b.inject_new_listen_addr(id, addr), + } + } + + fn inject_expired_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) { + match self { + Either::Left(a) => a.inject_expired_listen_addr(id, addr), + Either::Right(b) => b.inject_expired_listen_addr(id, addr), + } + } + + fn inject_new_external_addr(&mut self, addr: &Multiaddr) { + match self { + Either::Left(a) => a.inject_new_external_addr(addr), + Either::Right(b) => b.inject_new_external_addr(addr), + } + } + + fn inject_expired_external_addr(&mut self, addr: &Multiaddr) { + match self { + Either::Left(a) => a.inject_expired_external_addr(addr), + Either::Right(b) => b.inject_expired_external_addr(addr), + } + } + + fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) { + match self { + Either::Left(a) => a.inject_listener_error(id, err), + Either::Right(b) => b.inject_listener_error(id, err), + } + } + + fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &std::io::Error>) { + match self { + Either::Left(a) => a.inject_listener_closed(id, reason), + Either::Right(b) => b.inject_listener_closed(id, reason), + } + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + params: &mut impl PollParameters, + ) -> Poll> { + let event = match self { + Either::Left(behaviour) => futures::ready!(behaviour.poll(cx, params)) + .map_out(|e| Either::Left(e)) + .map_handler_and_in(|h| IntoEitherHandler::Left(h), |e| Either::Left(e)), + Either::Right(behaviour) => futures::ready!(behaviour.poll(cx, params)) + .map_out(|e| Either::Right(e)) + .map_handler_and_in(|h| IntoEitherHandler::Right(h), |e| Either::Right(e)), + }; + + Poll::Ready(event) + } +} + +impl NetworkBehaviourEventProcess + for Either +where + TBehaviourLeft: NetworkBehaviourEventProcess, + TBehaviourRight: NetworkBehaviourEventProcess, +{ + fn inject_event(&mut self, event: TEvent) { + match self { + Either::Left(a) => a.inject_event(event), + Either::Right(b) => b.inject_event(event), + } + } +} diff --git a/swarm/src/toggle.rs b/swarm/src/behaviour/toggle.rs similarity index 100% rename from swarm/src/toggle.rs rename to swarm/src/behaviour/toggle.rs diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 196a12126fa..fcc71e51518 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -53,15 +53,14 @@ //! are supported, when to open a new outbound substream, etc. //! -mod behaviour; mod registry; #[cfg(test)] mod test; mod upgrade; +pub mod behaviour; pub mod dial_opts; pub mod protocols_handler; -pub mod toggle; pub use behaviour::{ CloseConnection, NetworkBehaviour, NetworkBehaviourAction, NetworkBehaviourEventProcess, @@ -599,6 +598,11 @@ where self.network.is_connected(peer_id) } + /// Returns the currently connected peers. + pub fn connected_peers(&self) -> impl Iterator { + self.network.connected_peers() + } + /// Returns a reference to the provided [`NetworkBehaviour`]. pub fn behaviour(&self) -> &TBehaviour { &self.behaviour diff --git a/transports/noise/Cargo.toml b/transports/noise/Cargo.toml index 5907aa3ae1b..e6eca2a5b43 100644 --- a/transports/noise/Cargo.toml +++ b/transports/noise/Cargo.toml @@ -17,7 +17,7 @@ libp2p-core = { version = "0.31.0", path = "../../core", default-features = fals log = "0.4" prost = "0.9" rand = "0.8.3" -sha2 = "0.9.1" +sha2 = "0.10.0" static_assertions = "1" x25519-dalek = "1.1.0" zeroize = "1" diff --git a/transports/noise/src/lib.rs b/transports/noise/src/lib.rs index d6141483a40..68506dd77c0 100644 --- a/transports/noise/src/lib.rs +++ b/transports/noise/src/lib.rs @@ -420,7 +420,7 @@ where } /// Legacy configuration options. -#[derive(Clone)] +#[derive(Clone, Default)] pub struct LegacyConfig { /// Whether to continue sending legacy handshake payloads, /// i.e. length-prefixed protobuf payloads inside a length-prefixed @@ -433,12 +433,3 @@ pub struct LegacyConfig { /// libp2p implementations. pub recv_legacy_handshake: bool, } - -impl Default for LegacyConfig { - fn default() -> Self { - Self { - send_legacy_handshake: false, - recv_legacy_handshake: false, - } - } -} diff --git a/transports/pnet/Cargo.toml b/transports/pnet/Cargo.toml index 3d3cbb5a827..b674f18ab97 100644 --- a/transports/pnet/Cargo.toml +++ b/transports/pnet/Cargo.toml @@ -14,7 +14,7 @@ categories = ["network-programming", "asynchronous"] futures = "0.3.1" log = "0.4.8" salsa20 = "0.9" -sha3 = "0.9" +sha3 = "0.10" rand = "0.7" pin-project = "1.0.2" diff --git a/transports/tcp/Cargo.toml b/transports/tcp/Cargo.toml index a4e09a7bdbd..d1456ac478f 100644 --- a/transports/tcp/Cargo.toml +++ b/transports/tcp/Cargo.toml @@ -15,7 +15,7 @@ async-io-crate = { package = "async-io", version = "1.2.0", optional = true } futures = "0.3.8" futures-timer = "3.0" if-watch = { version = "0.2.0", optional = true } -if-addrs = { version = "0.6.4", optional = true } +if-addrs = { version = "0.7.0", optional = true } ipnet = "2.0.0" libc = "0.2.80" libp2p-core = { version = "0.31.0", path = "../../core", default-features = false }