Skip to content

Commit

Permalink
protocols/kad: Improve options to efficiently retrieve (#2712)
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire authored Nov 25, 2022
1 parent cff84f1 commit a997181
Show file tree
Hide file tree
Showing 9 changed files with 611 additions and 364 deletions.
34 changes: 17 additions & 17 deletions examples/distributed-key-value-store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use libp2p::{
swarm::{NetworkBehaviour, SwarmEvent},
PeerId, Swarm,
};
use libp2p_kad::{GetProvidersOk, GetRecordOk};
use std::error::Error;

#[async_std::main]
Expand Down Expand Up @@ -120,33 +121,32 @@ async fn main() -> Result<(), Box<dyn Error>> {
swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr);
}
}
SwarmEvent::Behaviour(MyBehaviourEvent::Kademlia(KademliaEvent::OutboundQueryCompleted { result, ..})) => {
SwarmEvent::Behaviour(MyBehaviourEvent::Kademlia(KademliaEvent::OutboundQueryProgressed { result, ..})) => {
match result {
QueryResult::GetProviders(Ok(ok)) => {
for peer in ok.providers {
QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders { key, providers, .. })) => {
for peer in providers {
println!(
"Peer {:?} provides key {:?}",
peer,
std::str::from_utf8(ok.key.as_ref()).unwrap()
"Peer {peer:?} provides key {:?}",
std::str::from_utf8(key.as_ref()).unwrap()
);
}
}
QueryResult::GetProviders(Err(err)) => {
eprintln!("Failed to get providers: {err:?}");
}
QueryResult::GetRecord(Ok(ok)) => {
for PeerRecord {
QueryResult::GetRecord(Ok(
GetRecordOk::FoundRecord(PeerRecord {
record: Record { key, value, .. },
..
} in ok.records
{
println!(
"Got record {:?} {:?}",
std::str::from_utf8(key.as_ref()).unwrap(),
std::str::from_utf8(&value).unwrap(),
);
}
})
)) => {
println!(
"Got record {:?} {:?}",
std::str::from_utf8(key.as_ref()).unwrap(),
std::str::from_utf8(&value).unwrap(),
);
}
QueryResult::GetRecord(Ok(_)) => {}
QueryResult::GetRecord(Err(err)) => {
eprintln!("Failed to get record: {err:?}");
}
Expand Down Expand Up @@ -191,7 +191,7 @@ fn handle_input_line(kademlia: &mut Kademlia<MemoryStore>, line: String) {
}
}
};
kademlia.get_record(key, Quorum::One);
kademlia.get_record(key);
}
Some("GET_PROVIDERS") => {
let key = {
Expand Down
35 changes: 27 additions & 8 deletions examples/file-sharing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ mod network {
) {
match event {
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
KademliaEvent::OutboundQueryCompleted {
KademliaEvent::OutboundQueryProgressed {
id,
result: QueryResult::StartProviding(_),
..
Expand All @@ -426,18 +426,37 @@ mod network {
let _ = sender.send(());
}
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
KademliaEvent::OutboundQueryCompleted {
KademliaEvent::OutboundQueryProgressed {
id,
result: QueryResult::GetProviders(Ok(GetProvidersOk { providers, .. })),
result:
QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders {
providers,
..
})),
..
},
)) => {
let _ = self
.pending_get_providers
.remove(&id)
.expect("Completed query to be previously pending.")
.send(providers);
if let Some(sender) = self.pending_get_providers.remove(&id) {
sender.send(providers).expect("Receiver not to be dropped");

// Finish the query. We are only interested in the first result.
self.swarm
.behaviour_mut()
.kademlia
.query_mut(&id)
.unwrap()
.finish();
}
}
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
KademliaEvent::OutboundQueryProgressed {
result:
QueryResult::GetProviders(Ok(
GetProvidersOk::FinishedWithNoAdditionalRecord { .. },
)),
..
},
)) => {}
SwarmEvent::Behaviour(ComposedEvent::Kademlia(_)) => {}
SwarmEvent::Behaviour(ComposedEvent::RequestResponse(
RequestResponseEvent::Message { message, .. },
Expand Down
2 changes: 1 addition & 1 deletion examples/ipfs-kad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

loop {
let event = swarm.select_next_some().await;
if let SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted {
if let SwarmEvent::Behaviour(KademliaEvent::OutboundQueryProgressed {
result: QueryResult::GetClosestPeers(result),
..
}) = event
Expand Down
4 changes: 4 additions & 0 deletions misc/metrics/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@

- Update `rust-version` to reflect the actual MSRV: 1.62.0. See [PR 3090].

- Changed `Metrics::query_result_get_record_ok` from `Histogram` to a `Counter`.
See [PR 2712].

[PR 2982]: https://github.com/libp2p/rust-libp2p/pull/2982/
[PR 3090]: https://github.com/libp2p/rust-libp2p/pull/3090
[PR 2712]: https://github.com/libp2p/rust-libp2p/pull/2712

# 0.10.0

Expand Down
23 changes: 14 additions & 9 deletions misc/metrics/src/kad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use prometheus_client::metrics::histogram::{exponential_buckets, Histogram};
use prometheus_client::registry::{Registry, Unit};

pub struct Metrics {
query_result_get_record_ok: Histogram,
query_result_get_record_ok: Counter,
query_result_get_record_error: Family<GetRecordResult, Counter>,

query_result_get_closest_peers_ok: Histogram,
Expand All @@ -48,7 +48,7 @@ impl Metrics {
pub fn new(registry: &mut Registry) -> Self {
let sub_registry = registry.sub_registry_with_prefix("kad");

let query_result_get_record_ok = Histogram::new(exponential_buckets(1.0, 2.0, 10));
let query_result_get_record_ok = Counter::default();
sub_registry.register(
"query_result_get_record_ok",
"Number of records returned by a successful Kademlia get record query.",
Expand Down Expand Up @@ -162,7 +162,7 @@ impl Metrics {
impl super::Recorder<libp2p_kad::KademliaEvent> for Metrics {
fn record(&self, event: &libp2p_kad::KademliaEvent) {
match event {
libp2p_kad::KademliaEvent::OutboundQueryCompleted { result, stats, .. } => {
libp2p_kad::KademliaEvent::OutboundQueryProgressed { result, stats, .. } => {
self.query_result_num_requests
.get_or_create(&result.into())
.observe(stats.num_requests().into());
Expand All @@ -180,9 +180,10 @@ impl super::Recorder<libp2p_kad::KademliaEvent> for Metrics {

match result {
libp2p_kad::QueryResult::GetRecord(result) => match result {
Ok(ok) => self
.query_result_get_record_ok
.observe(ok.records.len() as f64),
Ok(libp2p_kad::GetRecordOk::FoundRecord(_)) => {
self.query_result_get_record_ok.inc();
}
Ok(libp2p_kad::GetRecordOk::FinishedWithNoAdditionalRecord { .. }) => {}
Err(error) => {
self.query_result_get_record_error
.get_or_create(&error.into())
Expand All @@ -200,9 +201,13 @@ impl super::Recorder<libp2p_kad::KademliaEvent> for Metrics {
}
},
libp2p_kad::QueryResult::GetProviders(result) => match result {
Ok(ok) => self
.query_result_get_providers_ok
.observe(ok.providers.len() as f64),
Ok(libp2p_kad::GetProvidersOk::FoundProviders { providers, .. }) => {
self.query_result_get_providers_ok
.observe(providers.len() as f64);
}
Ok(libp2p_kad::GetProvidersOk::FinishedWithNoAdditionalRecord {
..
}) => {}
Err(error) => {
self.query_result_get_providers_error
.get_or_create(&error.into())
Expand Down
8 changes: 8 additions & 0 deletions protocols/kad/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,18 @@
This would eventually lead to warning that says: "New inbound substream to PeerId exceeds inbound substream limit. No older substream waiting to be reused."
See [PR 3152].

- Refactor APIs to be streaming.
- Renamed `KademliaEvent::OutboundQueryCompleted` to `KademliaEvent::OutboundQueryProgressed`
- Instead of a single event `OutboundQueryCompleted`, there are now multiple events emitted, allowing the user to process them as they come in (via the new `OutboundQueryProgressed`). See `ProgressStep` to identify the final `OutboundQueryProgressed` of a single query.
- To finish a query early, i.e. before the final `OutboundQueryProgressed` of the query, a caller needs to call `query.finish()`.
- There is no more automatic caching of records. The user has to manually call `put_record_to` on the `QueryInfo::GetRecord.cache_candidates` to cache a record to a close peer that did not return the record on the foregone query.
See [PR 2712].

[PR 3085]: https://github.com/libp2p/rust-libp2p/pull/3085
[PR 3011]: https://github.com/libp2p/rust-libp2p/pull/3011
[PR 3090]: https://github.com/libp2p/rust-libp2p/pull/3090
[PR 3152]: https://github.com/libp2p/rust-libp2p/pull/3152
[PR 2712]: https://github.com/libp2p/rust-libp2p/pull/2712

# 0.41.0

Expand Down
Loading

0 comments on commit a997181

Please sign in to comment.