From ac4eb121fc0a77ca76436d2ad1a20f4db692f82d Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Fri, 19 Jan 2024 16:43:41 +0200 Subject: [PATCH] feat: Send full wantlist every 30 seconds (#7) * feat: Send full wantlist every 30 seconds This behavior is part boxo/bitswap. It is not in the spec but it doesn't violate it. * fix tests --- Cargo.lock | 92 +++++++++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 4 +++ src/client.rs | 81 ++++++++++++++++++++++++++------------------- 3 files changed, 143 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dec2bb9..469b95b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -102,6 +102,7 @@ dependencies = [ "cid", "fnv", "futures", + "futures-timer", "hex", "libp2p", "multihash", @@ -180,6 +181,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "bumpalo" +version = "3.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" + [[package]] name = "byteorder" version = "1.5.0" @@ -482,6 +489,10 @@ name = "futures-timer" version = "3.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" +dependencies = [ + "gloo-timers", + "send_wrapper", +] [[package]] name = "futures-util" @@ -528,6 +539,18 @@ version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" +[[package]] +name = "gloo-timers" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "hashbrown" version = "0.14.3" @@ -593,6 +616,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "js-sys" +version = "0.3.67" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a1d36f1235bc969acba30b7f5990b864423a6068a10f7c90ae8f0112e3a59d1" +dependencies = [ + "wasm-bindgen", +] + [[package]] name = "keccak" version = "0.1.5" @@ -1106,6 +1138,12 @@ version = "1.0.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b97ed7a9823b74f99c7742f5336af7be5ecd3eeafcb1507d1fa93347b1d589b0" +[[package]] +name = "send_wrapper" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f638d531eccd6e23b980caf34876660d38e265409d8e99b397ab71eb3612fad0" + [[package]] name = "serde" version = "1.0.195" @@ -1430,6 +1468,60 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasm-bindgen" +version = "0.2.90" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1223296a201415c7fad14792dbefaace9bd52b62d33453ade1c5b5f07555406" +dependencies = [ + "cfg-if", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.90" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcdc935b63408d58a32f8cc9738a0bffd8f05cc7c002086c6ef20b7312ad9dcd" +dependencies = [ + "bumpalo", + "log", + "once_cell", + "proc-macro2", + "quote", + "syn 2.0.48", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.90" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e4c238561b2d428924c49815533a8b9121c664599558a5d9ec51f8a1740a999" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.90" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bae1abb6806dc1ad9e560ed242107c0f6c84335f1749dd4e8ddb012ebd5e25a7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.90" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d91413b1c31d7539ba5ef2451af3f0b833a005eb27a631cec32bc0635a8602b" + [[package]] name = "windows-targets" version = "0.48.5" diff --git a/Cargo.toml b/Cargo.toml index 6f0b4d1..41e50ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ bytes = "1.5.0" cid = "0.11.0" fnv = "1.0.7" futures = "0.3.30" +futures-timer = "3.0.2" libp2p = "0.53.2" multihash = "0.19.1" multihash-codetable = "0.1.1" @@ -25,3 +26,6 @@ void = "1.0.2" hex = "0.4.3" multihash-codetable = { version = "0.1.1", features = ["digest", "sha2"] } tokio = { version = "1.35.1", features = ["rt", "macros", "time"] } + +[features] +wasm-bindgen = ["futures-timer/wasm-bindgen"] diff --git a/src/client.rs b/src/client.rs index 27b5996..30a1cfb 100644 --- a/src/client.rs +++ b/src/client.rs @@ -2,7 +2,7 @@ use std::collections::VecDeque; use std::fmt; use std::sync::Arc; use std::task::{ready, Context, Poll}; -use std::time::{Duration, Instant}; +use std::time::Duration; use asynchronous_codec::FramedWrite; use blockstore::{Blockstore, BlockstoreError}; @@ -12,6 +12,7 @@ use futures::future::{AbortHandle, Abortable, BoxFuture}; use futures::stream::FuturesUnordered; use futures::task::AtomicWaker; use futures::{FutureExt, SinkExt, StreamExt}; +use futures_timer::Delay; use libp2p::swarm::NotifyHandler; use libp2p::PeerId; use libp2p::{ @@ -75,13 +76,14 @@ where next_query_id: u64, waker: Arc, multihasher: Arc>, + send_full_timer: Delay, } #[derive(Debug)] struct PeerState { sending: Arc>, wantlist: WantlistState, - last_send_full_tm: Option, + send_full: bool, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -117,6 +119,7 @@ where next_query_id: 0, waker: Arc::new(AtomicWaker::new()), multihasher, + send_full_timer: Delay::new(SEND_FULL_INTERVAL), }) } @@ -126,7 +129,7 @@ where PeerState { sending: Arc::new(Mutex::new(SendingState::Ready)), wantlist: WantlistState::new(), - last_send_full_tm: None, + send_full: true, }, ); @@ -294,12 +297,7 @@ where continue; } } - SendingState::Ready => match state.last_send_full_tm { - // Send full list if interval time is elapsed. - Some(tm) => tm.elapsed() >= SEND_FULL_INTERVAL, - // Send full list the first time. - None => true, - }, + SendingState::Ready => state.send_full, // State is poisoned, send full list to recover. SendingState::Poisoned => true, }; @@ -310,18 +308,15 @@ where state.wantlist.generate_proto_update(&self.wantlist) }; - if wantlist.entries.is_empty() { - // Nothing to send - // - // TODO: What if the send_full is true? Shouldn't we send it to clear - // the wantlist? However we should do it once. + // Allow empty entries to be sent when send_full flag is set. + if send_full { + // Reset flag + state.send_full = false; + } else if wantlist.entries.is_empty() { + // No updates to be sent for this peer continue; } - if wantlist.full { - state.last_send_full_tm = Some(Instant::now()); - } - self.queue.push_back(ToSwarm::NotifyHandler { peer_id: peer.to_owned(), handler: NotifyHandler::Any, @@ -345,6 +340,16 @@ where return Poll::Ready(ev); } + if self.send_full_timer.poll_unpin(cx).is_ready() { + for state in self.peers.values_mut() { + state.send_full = true; + } + + // Reset timer and loop again to get it registered + self.send_full_timer.reset(SEND_FULL_INTERVAL); + continue; + } + if let Poll::Ready(Some(task_result)) = self.tasks.poll_next_unpin(cx) { match task_result { // Blockstore already has the data so return them to the user @@ -686,28 +691,36 @@ mod tests { // Simulate that full wantlist is needed for peer_state in client.peers.values_mut() { - *peer_state.last_send_full_tm.as_mut().unwrap() -= SEND_FULL_INTERVAL; + peer_state.send_full = true; } - let ev = poll_fn(|cx| client.poll(cx)).await; - let (peer_id, wantlist, send_state) = expect_send_wantlist_event(ev); - - // wantlist should be generated only for peer2, because peer1 already replied with DontHave - assert_eq!(peer_id, peer2); - assert_eq!(wantlist.entries.len(), 1); - assert!(wantlist.full); + for _ in 0..2 { + let ev = poll_fn(|cx| client.poll(cx)).await; + let (peer_id, wantlist, send_state) = expect_send_wantlist_event(ev); - let entry = &wantlist.entries[0]; - assert_eq!(entry.block, cid1.to_bytes()); - assert!(!entry.cancel); - assert_eq!(entry.wantType, WantType::Have); - assert!(entry.sendDontHave); + if peer_id == peer1 { + // full wantlist of peer1 will be empty because it alreayd replied with DontHave + assert!(wantlist.entries.is_empty()); + assert!(wantlist.full); + } else if peer_id == peer2 { + assert_eq!(wantlist.entries.len(), 1); + assert!(wantlist.full); + + let entry = &wantlist.entries[0]; + assert_eq!(entry.block, cid1.to_bytes()); + assert!(!entry.cancel); + assert_eq!(entry.wantType, WantType::Have); + assert!(entry.sendDontHave); + } else { + panic!("Unknown peer id"); + } - // Mark send state as ready - *send_state.lock().unwrap() = SendingState::Ready; + // Mark send state as ready + *send_state.lock().unwrap() = SendingState::Ready; + } // No other events should be produced - assert!(poll_fn_once(|cx| client.poll(cx)).await.is_none()); + assert!(dbg!(poll_fn_once(|cx| client.poll(cx)).await).is_none()); } #[tokio::test]