Skip to content

Commit

Permalink
Merge branch 'master' of github.com:libp2p/rust-libp2p into update-if…
Browse files Browse the repository at this point in the history
…-watch
  • Loading branch information
jxs committed Nov 16, 2022
2 parents 788c8b6 + 43fdfe2 commit ff02f71
Show file tree
Hide file tree
Showing 65 changed files with 667 additions and 334 deletions.
1 change: 1 addition & 0 deletions .github/mergify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pull_request_rules:
conditions:
- conflict
- -draft # Draft PRs are allowed to have conflicts.
- -author=dependabot[bot]
actions:
comment:
message: This pull request has merge conflicts. Could you please resolve them @{{author}}? 🙏
Expand Down
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ full = [
"websocket",
"yamux",
]
async-std = ["libp2p-mdns?/async-io", "libp2p-tcp?/async-io", "libp2p-dns?/async-std", "libp2p-quic?/async-std"]

async-std = ["libp2p-swarm/async-std", "libp2p-mdns?/async-io", "libp2p-tcp?/async-io", "libp2p-dns?/async-std", "libp2p-quic?/async-std"]
autonat = ["dep:libp2p-autonat"]
dcutr = ["dep:libp2p-dcutr", "libp2p-metrics?/dcutr"]
deflate = ["dep:libp2p-deflate"]
Expand All @@ -74,7 +75,7 @@ rsa = ["libp2p-core/rsa"]
secp256k1 = ["libp2p-core/secp256k1"]
serde = ["libp2p-core/serde", "libp2p-kad?/serde", "libp2p-gossipsub?/serde"]
tcp = ["dep:libp2p-tcp"]
tokio = ["libp2p-mdns?/tokio", "libp2p-tcp?/tokio", "libp2p-dns?/tokio", "libp2p-quic?/tokio"]
tokio = ["libp2p-swarm/tokio", "libp2p-mdns?/tokio", "libp2p-tcp?/tokio", "libp2p-dns?/tokio", "libp2p-quic?/tokio"]
uds = ["dep:libp2p-uds"]
wasm-bindgen = ["futures-timer/wasm-bindgen", "instant/wasm-bindgen", "getrandom/js"]
wasm-ext = ["dep:libp2p-wasm-ext"]
Expand Down
3 changes: 3 additions & 0 deletions core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@

- Hide `prost::Error` from public API in `FromEnvelopeError::InvalidPeerRecord` and `signed_envelope::DecodingError`. See [PR 3058].

- Move `Executor` to `libp2p-swarm`. See [PR 3097].

[PR 3031]: https://github.com/libp2p/rust-libp2p/pull/3031
[PR 3058]: https://github.com/libp2p/rust-libp2p/pull/3058
[PR 3097]: https://github.com/libp2p/rust-libp2p/pull/3097

# 0.37.0

Expand Down
21 changes: 1 addition & 20 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub use identity::PublicKey;
pub use multiaddr::Multiaddr;
pub use multihash;
pub use muxing::StreamMuxer;
pub use peer_id::ParseError;
pub use peer_id::PeerId;
pub use peer_record::PeerRecord;
pub use signed_envelope::SignedEnvelope;
Expand All @@ -82,23 +83,3 @@ pub use upgrade::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeError, U
#[derive(thiserror::Error, Debug)]
#[error(transparent)]
pub struct DecodeError(prost::DecodeError);

use std::{future::Future, pin::Pin};

/// Implemented on objects that can run a `Future` in the background.
///
/// > **Note**: While it may be tempting to implement this trait on types such as
/// > [`futures::stream::FuturesUnordered`], please note that passing an `Executor` is
/// > optional, and that `FuturesUnordered` (or a similar struct) will automatically
/// > be used as fallback by libp2p. The `Executor` trait should therefore only be
/// > about running `Future`s in the background.
pub trait Executor {
/// Run the given future in the background until it ends.
fn exec(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>);
}

impl<F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>)> Executor for F {
fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
self(f)
}
}
4 changes: 2 additions & 2 deletions core/src/peer_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ const MAX_INLINE_KEY_LENGTH: usize = 42;

/// Identifier of a peer of the network.
///
/// The data is a multihash of the public key of the peer.
/// See the [spec](https://github.com/libp2p/specs/blob/master/peer-ids/peer-ids.md) for more information.
/// The data is a CIDv0 compatible multihash of the protobuf encoded public key of the peer
/// as specified in [specs/peer-ids](https://github.com/libp2p/specs/blob/master/peer-ids/peer-ids.md).
#[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct PeerId {
multihash: Multihash,
Expand Down
23 changes: 6 additions & 17 deletions examples/chat-tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use libp2p::{
core::upgrade,
floodsub::{self, Floodsub, FloodsubEvent},
identity, mdns, mplex, noise,
swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent},
swarm::{NetworkBehaviour, SwarmEvent},
tcp, Multiaddr, PeerId, Transport,
};
use std::error::Error;
Expand Down Expand Up @@ -91,23 +91,12 @@ async fn main() -> Result<(), Box<dyn Error>> {
}

// Create a Swarm to manage peers and events.
let mut swarm = {
let mdns = mdns::tokio::Behaviour::new(Default::default())?;
let mut behaviour = MyBehaviour {
floodsub: Floodsub::new(peer_id),
mdns,
};

behaviour.floodsub.subscribe(floodsub_topic.clone());

SwarmBuilder::new(transport, behaviour, peer_id)
// We want the connection background tasks to be spawned
// onto the tokio runtime.
.executor(Box::new(|fut| {
tokio::spawn(fut);
}))
.build()
let mdns_behaviour = mdns::Behaviour::new(Default::default())?;
let behaviour = MyBehaviour {
floodsub: Floodsub::new(peer_id),
mdns: mdns_behaviour,
};
let mut swarm = libp2p_swarm::Swarm::with_tokio_executor(transport, behaviour, peer_id);

// Reach out to another node if specified
if let Some(to_dial) = std::env::args().nth(1) {
Expand Down
2 changes: 1 addition & 1 deletion examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
};

behaviour.floodsub.subscribe(floodsub_topic.clone());
Swarm::new(transport, behaviour, local_peer_id)
Swarm::with_threadpool_executor(transport, behaviour, local_peer_id)
};

// Reach out to another node if specified
Expand Down
2 changes: 1 addition & 1 deletion examples/distributed-key-value-store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let kademlia = Kademlia::new(local_peer_id, store);
let mdns = mdns::async_io::Behaviour::new(mdns::Config::default())?;
let behaviour = MyBehaviour { kademlia, mdns };
Swarm::new(transport, behaviour, local_peer_id)
Swarm::with_async_std_executor(transport, behaviour, local_peer_id)
};

// Read full lines from stdin
Expand Down
9 changes: 3 additions & 6 deletions examples/file-sharing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,7 @@ mod network {
ProtocolSupport, RequestId, RequestResponse, RequestResponseCodec, RequestResponseEvent,
RequestResponseMessage, ResponseChannel,
};
use libp2p::swarm::{
ConnectionHandlerUpgrErr, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent,
};
use libp2p::swarm::{ConnectionHandlerUpgrErr, NetworkBehaviour, Swarm, SwarmEvent};
use std::collections::{hash_map, HashMap, HashSet};
use std::iter;

Expand Down Expand Up @@ -252,7 +250,7 @@ mod network {

// Build the Swarm, connecting the lower layer transport logic with the
// higher layer network behaviour logic.
let swarm = SwarmBuilder::new(
let swarm = Swarm::with_threadpool_executor(
libp2p::development_transport(id_keys).await?,
ComposedBehaviour {
kademlia: Kademlia::new(peer_id, MemoryStore::new(peer_id)),
Expand All @@ -263,8 +261,7 @@ mod network {
),
},
peer_id,
)
.build();
);

let (command_sender, command_receiver) = mpsc::channel(0);
let (event_sender, event_receiver) = mpsc::channel(0);
Expand Down
2 changes: 1 addition & 1 deletion examples/gossipsub-chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let mut swarm = {
let mdns = mdns::async_io::Behaviour::new(mdns::Config::default())?;
let behaviour = MyBehaviour { gossipsub, mdns };
Swarm::new(transport, behaviour, local_peer_id)
Swarm::with_async_std_executor(transport, behaviour, local_peer_id)
};

// Read full lines from stdin
Expand Down
2 changes: 1 addition & 1 deletion examples/ipfs-kad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
behaviour.add_address(&PeerId::from_str(peer)?, bootaddr.clone());
}

Swarm::new(transport, behaviour, local_peer_id)
Swarm::with_async_std_executor(transport, behaviour, local_peer_id)
};

// Order Kademlia to search for a peer.
Expand Down
2 changes: 1 addition & 1 deletion examples/ipfs-private.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

println!("Subscribing to {gossipsub_topic:?}");
behaviour.gossipsub.subscribe(&gossipsub_topic).unwrap();
Swarm::new(transport, behaviour, local_peer_id)
Swarm::with_async_std_executor(transport, behaviour, local_peer_id)
};

// Reach out to other nodes if specified
Expand Down
2 changes: 1 addition & 1 deletion examples/mdns-passive-discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Create a Swarm that establishes connections through the given transport.
// Note that the MDNS behaviour itself will not actually inititiate any connections,
// as it only uses UDP.
let mut swarm = Swarm::new(transport, behaviour, peer_id);
let mut swarm = Swarm::with_async_std_executor(transport, behaviour, peer_id);
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;

loop {
Expand Down
2 changes: 1 addition & 1 deletion examples/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

let transport = libp2p::development_transport(local_key).await?;

let mut swarm = Swarm::new(transport, Behaviour::default(), local_peer_id);
let mut swarm = Swarm::with_async_std_executor(transport, Behaviour::default(), local_peer_id);

// Tell the swarm to listen on all interfaces and a random, OS-assigned
// port.
Expand Down
4 changes: 4 additions & 0 deletions misc/metrics/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

- Update to `libp2p-gossipsub` `v0.43.0`.

- Add `protocol_stack` metrics. See [PR 2982].

[PR 2982]: https://github.com/libp2p/rust-libp2p/pull/2982/

# 0.10.0

- Update to `libp2p-swarm` `v0.40.0`.
Expand Down
27 changes: 23 additions & 4 deletions misc/metrics/examples/metrics/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use futures::stream::StreamExt;
use libp2p::core::Multiaddr;
use libp2p::metrics::{Metrics, Recorder};
use libp2p::swarm::{NetworkBehaviour, SwarmEvent};
use libp2p::{identity, ping, PeerId, Swarm};
use libp2p::{identify, identity, ping, PeerId, Swarm};
use libp2p_swarm::keep_alive;
use log::info;
use prometheus_client::registry::Registry;
Expand All @@ -68,11 +68,12 @@ fn main() -> Result<(), Box<dyn Error>> {

let local_key = identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public());
let local_pub_key = local_key.public();
info!("Local peer id: {:?}", local_peer_id);

let mut swarm = Swarm::new(
let mut swarm = Swarm::without_executor(
block_on(libp2p::development_transport(local_key))?,
Behaviour::default(),
Behaviour::new(local_pub_key),
local_peer_id,
);

Expand All @@ -95,6 +96,10 @@ fn main() -> Result<(), Box<dyn Error>> {
info!("{:?}", ping_event);
metrics.record(&ping_event);
}
SwarmEvent::Behaviour(BehaviourEvent::Identify(identify_event)) => {
info!("{:?}", identify_event);
metrics.record(&identify_event);
}
swarm_event => {
info!("{:?}", swarm_event);
metrics.record(&swarm_event);
Expand All @@ -109,8 +114,22 @@ fn main() -> Result<(), Box<dyn Error>> {
///
/// For illustrative purposes, this includes the [`keep_alive::Behaviour`]) behaviour so the ping actually happen
/// and can be observed via the metrics.
#[derive(NetworkBehaviour, Default)]
#[derive(NetworkBehaviour)]
struct Behaviour {
identify: identify::Behaviour,
keep_alive: keep_alive::Behaviour,
ping: ping::Behaviour,
}

impl Behaviour {
fn new(local_pub_key: libp2p::identity::PublicKey) -> Self {
Self {
ping: ping::Behaviour::default(),
identify: identify::Behaviour::new(identify::Config::new(
"/ipfs/0.1.0".into(),
local_pub_key,
)),
keep_alive: keep_alive::Behaviour::default(),
}
}
}
25 changes: 24 additions & 1 deletion misc/metrics/src/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::protocol_stack;
use libp2p_core::PeerId;
use prometheus_client::encoding::text::{EncodeMetric, Encoder};
use prometheus_client::encoding::text::{Encode, EncodeMetric, Encoder};
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::histogram::{exponential_buckets, Histogram};
use prometheus_client::metrics::MetricType;
use prometheus_client::registry::Registry;
Expand All @@ -36,6 +38,7 @@ pub struct Metrics {
received_info_listen_addrs: Histogram,
received_info_protocols: Histogram,
sent: Counter,
listen_addresses: Family<AddressLabels, Counter>,
}

impl Metrics {
Expand Down Expand Up @@ -100,6 +103,13 @@ impl Metrics {
Box::new(sent.clone()),
);

let listen_addresses = Family::default();
sub_registry.register(
"listen_addresses",
"Number of listen addresses for remote peer per protocol stack",
Box::new(listen_addresses.clone()),
);

Self {
protocols,
error,
Expand All @@ -108,6 +118,7 @@ impl Metrics {
received_info_listen_addrs,
received_info_protocols,
sent,
listen_addresses,
}
}
}
Expand Down Expand Up @@ -167,6 +178,13 @@ impl super::Recorder<libp2p_identify::Event> for Metrics {
.observe(info.protocols.len() as f64);
self.received_info_listen_addrs
.observe(info.listen_addrs.len() as f64);
for listen_addr in &info.listen_addrs {
self.listen_addresses
.get_or_create(&AddressLabels {
protocols: protocol_stack::as_string(listen_addr),
})
.inc();
}
}
libp2p_identify::Event::Sent { .. } => {
self.sent.inc();
Expand All @@ -190,6 +208,11 @@ impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleE
}
}

#[derive(Encode, Hash, Clone, Eq, PartialEq)]
struct AddressLabels {
protocols: String,
}

#[derive(Default, Clone)]
struct Protocols {
peers: Arc<Mutex<HashMap<PeerId, Vec<String>>>>,
Expand Down
1 change: 1 addition & 0 deletions misc/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ mod identify;
mod kad;
#[cfg(feature = "ping")]
mod ping;
mod protocol_stack;
#[cfg(feature = "relay")]
mod relay;
mod swarm;
Expand Down
27 changes: 27 additions & 0 deletions misc/metrics/src/protocol_stack.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use libp2p_core::multiaddr::Multiaddr;

pub fn as_string(ma: &Multiaddr) -> String {
let len = ma
.protocol_stack()
.fold(0, |acc, proto| acc + proto.len() + 1);
let mut protocols = String::with_capacity(len);
for proto_tag in ma.protocol_stack() {
protocols.push('/');
protocols.push_str(proto_tag);
}
protocols
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn ip6_tcp_wss_p2p() {
let ma = Multiaddr::try_from("/ip6/2001:8a0:7ac5:4201:3ac9:86ff:fe31:7095/tcp/8000/wss/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC").expect("testbad");

let protocol_stack = as_string(&ma);

assert_eq!(protocol_stack, "/ip6/tcp/wss/p2p");
}
}
Loading

0 comments on commit ff02f71

Please sign in to comment.