Skip to content

Commit

Permalink
kad: Providers part 6: stop providing (#245)
Browse files Browse the repository at this point in the history
Allow removing a local content provider and stopping refreshing it.
  • Loading branch information
dmitry-markin authored Sep 30, 2024
1 parent 0f865fb commit e9f4f97
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 11 deletions.
15 changes: 15 additions & 0 deletions src/protocol/libp2p/kademlia/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ pub(crate) enum KademliaCommand {
query_id: QueryId,
},

/// Stop providing the key locally and refreshing the provider.
StopProviding {
/// Provided key.
key: RecordKey,
},

/// Store record locally.
StoreRecord {
// Record.
Expand Down Expand Up @@ -364,6 +370,15 @@ impl KademliaHandle {
query_id
}

/// Stop providing the key on the DHT.
///
/// This will stop republishing the provider, but won't
/// remove it instantly from the nodes. It will be removed from them after the provider TTL
/// expires, set by default to 48 hours.
pub async fn stop_providing(&mut self, key: RecordKey) {
let _ = self.cmd_tx.send(KademliaCommand::StopProviding { key }).await;
}

/// Get providers from DHT.
///
/// Returns [`Err`] only if `Kademlia` is terminating.
Expand Down
13 changes: 12 additions & 1 deletion src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,7 @@ impl Kademlia {
query = ?query_id,
?key,
?public_addresses,
"register as content provider"
"register as a content provider",
);

let provider = ProviderRecord {
Expand All @@ -1064,6 +1064,17 @@ impl Kademlia {
.into(),
);
}
Some(KademliaCommand::StopProviding {
key,
}) => {
tracing::debug!(
target: LOG_TARGET,
?key,
"stop providing",
);

self.store.remove_local_provider(key);
}
Some(KademliaCommand::GetRecord { key, quorum, query_id }) => {
tracing::debug!(target: LOG_TARGET, ?key, "get record from DHT");

Expand Down
57 changes: 47 additions & 10 deletions src/protocol/libp2p/kademlia/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,19 @@

//! Memory store implementation for Kademlia.
#![allow(unused)]
use crate::{
protocol::libp2p::kademlia::{
config::DEFAULT_PROVIDER_REFRESH_INTERVAL,
futures_stream::FuturesStream,
record::{Key, ProviderRecord, Record},
types::Key as KademliaKey,
},
PeerId,
};

use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
use futures::{future::BoxFuture, StreamExt};
use std::{
collections::{hash_map::Entry, HashMap},
num::NonZeroUsize,
time::Duration,
};

Expand Down Expand Up @@ -63,6 +62,7 @@ pub struct MemoryStore {

impl MemoryStore {
/// Create new [`MemoryStore`].
#[cfg(test)]
pub fn new(local_peer_id: PeerId) -> Self {
Self {
local_peer_id,
Expand Down Expand Up @@ -149,14 +149,14 @@ impl MemoryStore {
///
/// Returns a non-empty list of providers, if any.
pub fn get_providers(&mut self, key: &Key) -> Vec<ProviderRecord> {
let drop = self.provider_keys.get_mut(key).map_or(false, |providers| {
let drop_key = self.provider_keys.get_mut(key).map_or(false, |providers| {
let now = std::time::Instant::now();
providers.retain(|p| !p.is_expired(now));

providers.is_empty()
});

if drop {
if drop_key {
self.provider_keys.remove(key);

Vec::default()
Expand Down Expand Up @@ -211,7 +211,7 @@ impl MemoryStore {
false
},
Entry::Occupied(mut entry) => {
let mut providers = entry.get_mut();
let providers = entry.get_mut();

// Providers under every key are sorted by distance from the provided key, with
// equal distances meaning peer IDs (more strictly, their hashes)
Expand Down Expand Up @@ -261,6 +261,46 @@ impl MemoryStore {
}
}

/// Remove local provider for `key`.
pub fn remove_local_provider(&mut self, key: Key) {
if self.local_providers.remove(&key).is_none() {
tracing::warn!(?key, "trying to remove nonexistent local provider",);
return;
};

match self.provider_keys.entry(key.clone()) {
Entry::Vacant(_) => {
tracing::error!(?key, "local provider key not found during removal",);
debug_assert!(false);
return;
}
Entry::Occupied(mut entry) => {
let providers = entry.get_mut();

// Providers are sorted by distance.
let local_provider_distance = KademliaKey::from(self.local_peer_id.clone())
.distance(&KademliaKey::new(key.clone()));
let provider_position =
providers.binary_search_by(|p| p.distance().cmp(&local_provider_distance));

match provider_position {
Ok(i) => {
providers.remove(i);
}
Err(_) => {
tracing::error!(?key, "local provider not found during removal",);
debug_assert!(false);
return;
}
}

if providers.is_empty() {
entry.remove();
}
}
};
}

/// Poll next action from the store.
pub async fn next_action(&mut self) -> Option<MemoryStoreAction> {
// [`FuturesStream`] never terminates, so `map()` below is always triggered.
Expand Down Expand Up @@ -328,10 +368,7 @@ impl Default for MemoryStoreConfig {
mod tests {
use super::*;
use crate::PeerId;
use multiaddr::{
multiaddr,
Protocol::{Ip4, Tcp},
};
use multiaddr::multiaddr;

#[test]
fn put_get_record() {
Expand Down

0 comments on commit e9f4f97

Please sign in to comment.