Skip to content

Commit

Permalink
feat: Send full wantlist every 30 seconds (#7)
Browse files Browse the repository at this point in the history
* feat: Send full wantlist every 30 seconds

This behavior is part boxo/bitswap. It is not in the spec but it
doesn't violate it.

* fix tests
  • Loading branch information
oblique authored Jan 19, 2024
1 parent 95a90d7 commit ac4eb12
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 34 deletions.
92 changes: 92 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ bytes = "1.5.0"
cid = "0.11.0"
fnv = "1.0.7"
futures = "0.3.30"
futures-timer = "3.0.2"
libp2p = "0.53.2"
multihash = "0.19.1"
multihash-codetable = "0.1.1"
Expand All @@ -25,3 +26,6 @@ void = "1.0.2"
hex = "0.4.3"
multihash-codetable = { version = "0.1.1", features = ["digest", "sha2"] }
tokio = { version = "1.35.1", features = ["rt", "macros", "time"] }

[features]
wasm-bindgen = ["futures-timer/wasm-bindgen"]
81 changes: 47 additions & 34 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::VecDeque;
use std::fmt;
use std::sync::Arc;
use std::task::{ready, Context, Poll};
use std::time::{Duration, Instant};
use std::time::Duration;

use asynchronous_codec::FramedWrite;
use blockstore::{Blockstore, BlockstoreError};
Expand All @@ -12,6 +12,7 @@ use futures::future::{AbortHandle, Abortable, BoxFuture};
use futures::stream::FuturesUnordered;
use futures::task::AtomicWaker;
use futures::{FutureExt, SinkExt, StreamExt};
use futures_timer::Delay;
use libp2p::swarm::NotifyHandler;
use libp2p::PeerId;
use libp2p::{
Expand Down Expand Up @@ -75,13 +76,14 @@ where
next_query_id: u64,
waker: Arc<AtomicWaker>,
multihasher: Arc<MultihasherTable<S>>,
send_full_timer: Delay,
}

#[derive(Debug)]
struct PeerState<const S: usize> {
sending: Arc<Mutex<SendingState>>,
wantlist: WantlistState<S>,
last_send_full_tm: Option<Instant>,
send_full: bool,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -117,6 +119,7 @@ where
next_query_id: 0,
waker: Arc::new(AtomicWaker::new()),
multihasher,
send_full_timer: Delay::new(SEND_FULL_INTERVAL),
})
}

Expand All @@ -126,7 +129,7 @@ where
PeerState {
sending: Arc::new(Mutex::new(SendingState::Ready)),
wantlist: WantlistState::new(),
last_send_full_tm: None,
send_full: true,
},
);

Expand Down Expand Up @@ -294,12 +297,7 @@ where
continue;
}
}
SendingState::Ready => match state.last_send_full_tm {
// Send full list if interval time is elapsed.
Some(tm) => tm.elapsed() >= SEND_FULL_INTERVAL,
// Send full list the first time.
None => true,
},
SendingState::Ready => state.send_full,
// State is poisoned, send full list to recover.
SendingState::Poisoned => true,
};
Expand All @@ -310,18 +308,15 @@ where
state.wantlist.generate_proto_update(&self.wantlist)
};

if wantlist.entries.is_empty() {
// Nothing to send
//
// TODO: What if the send_full is true? Shouldn't we send it to clear
// the wantlist? However we should do it once.
// Allow empty entries to be sent when send_full flag is set.
if send_full {
// Reset flag
state.send_full = false;
} else if wantlist.entries.is_empty() {
// No updates to be sent for this peer
continue;
}

if wantlist.full {
state.last_send_full_tm = Some(Instant::now());
}

self.queue.push_back(ToSwarm::NotifyHandler {
peer_id: peer.to_owned(),
handler: NotifyHandler::Any,
Expand All @@ -345,6 +340,16 @@ where
return Poll::Ready(ev);
}

if self.send_full_timer.poll_unpin(cx).is_ready() {
for state in self.peers.values_mut() {
state.send_full = true;
}

// Reset timer and loop again to get it registered
self.send_full_timer.reset(SEND_FULL_INTERVAL);
continue;
}

if let Poll::Ready(Some(task_result)) = self.tasks.poll_next_unpin(cx) {
match task_result {
// Blockstore already has the data so return them to the user
Expand Down Expand Up @@ -686,28 +691,36 @@ mod tests {

// Simulate that full wantlist is needed
for peer_state in client.peers.values_mut() {
*peer_state.last_send_full_tm.as_mut().unwrap() -= SEND_FULL_INTERVAL;
peer_state.send_full = true;
}

let ev = poll_fn(|cx| client.poll(cx)).await;
let (peer_id, wantlist, send_state) = expect_send_wantlist_event(ev);

// wantlist should be generated only for peer2, because peer1 already replied with DontHave
assert_eq!(peer_id, peer2);
assert_eq!(wantlist.entries.len(), 1);
assert!(wantlist.full);
for _ in 0..2 {
let ev = poll_fn(|cx| client.poll(cx)).await;
let (peer_id, wantlist, send_state) = expect_send_wantlist_event(ev);

let entry = &wantlist.entries[0];
assert_eq!(entry.block, cid1.to_bytes());
assert!(!entry.cancel);
assert_eq!(entry.wantType, WantType::Have);
assert!(entry.sendDontHave);
if peer_id == peer1 {
// full wantlist of peer1 will be empty because it alreayd replied with DontHave
assert!(wantlist.entries.is_empty());
assert!(wantlist.full);
} else if peer_id == peer2 {
assert_eq!(wantlist.entries.len(), 1);
assert!(wantlist.full);

let entry = &wantlist.entries[0];
assert_eq!(entry.block, cid1.to_bytes());
assert!(!entry.cancel);
assert_eq!(entry.wantType, WantType::Have);
assert!(entry.sendDontHave);
} else {
panic!("Unknown peer id");
}

// Mark send state as ready
*send_state.lock().unwrap() = SendingState::Ready;
// Mark send state as ready
*send_state.lock().unwrap() = SendingState::Ready;
}

// No other events should be produced
assert!(poll_fn_once(|cx| client.poll(cx)).await.is_none());
assert!(dbg!(poll_fn_once(|cx| client.poll(cx)).await).is_none());
}

#[tokio::test]
Expand Down

0 comments on commit ac4eb12

Please sign in to comment.