Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kad: Fix not retrieving local records #221

Merged
merged 5 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/protocol/libp2p/kademlia/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,12 @@ pub enum KademliaEvent {
/// The type of the DHT records.
#[derive(Debug, Clone)]
pub enum RecordsType {
/// Record was found in the local store.
/// Record was found in the local store and [`Quorum::One`] was used.
///
/// This contains only a single result.
LocalStore(Record),

/// Records found in the network.
/// Records found in the network. This can include the locally found record.
Network(Vec<PeerRecord>),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Thinking out loud, since we include the local records and the records found from the network, we could try to remove the RecordsType enum all together.
Then, we could mention in the KademliaEvent::GetRecordSuccess that records fetched from the local store are populated first in the list of records ie Vec<PeerRecord> and that users can distinguish between local and network records by comparing the peer ID of the record.

}

Expand Down
2 changes: 1 addition & 1 deletion src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,7 @@ impl Kademlia {
key.clone(),
self.routing_table.closest(Key::new(key.clone()), self.replication_factor).into(),
quorum,
if record.is_some() { 1 } else { 0 },
record.cloned(),
);
}
}
Expand Down
16 changes: 10 additions & 6 deletions src/protocol/libp2p/kademlia/query/get_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,11 @@ pub struct GetRecordContext {

impl GetRecordContext {
/// Create new [`GetRecordContext`].
pub fn new(config: GetRecordConfig, in_peers: VecDeque<KademliaPeer>) -> Self {
pub fn new(
config: GetRecordConfig,
in_peers: VecDeque<KademliaPeer>,
found_records: Vec<PeerRecord>,
) -> Self {
let mut candidates = BTreeMap::new();

for candidate in &in_peers {
Expand All @@ -123,7 +127,7 @@ impl GetRecordContext {
candidates,
pending: HashMap::new(),
queried: HashSet::new(),
found_records: Vec::new(),
found_records,
}
}

Expand Down Expand Up @@ -354,7 +358,7 @@ mod tests {
#[test]
fn completes_when_no_candidates() {
let config = default_config();
let mut context = GetRecordContext::new(config, VecDeque::new());
let mut context = GetRecordContext::new(config, VecDeque::new(), Vec::new());
assert!(context.is_done());
let event = context.next_action().unwrap();
assert_eq!(event, QueryAction::QueryFailed { query: QueryId(0) });
Expand All @@ -363,7 +367,7 @@ mod tests {
known_records: 1,
..default_config()
};
let mut context = GetRecordContext::new(config, VecDeque::new());
let mut context = GetRecordContext::new(config, VecDeque::new(), Vec::new());
assert!(context.is_done());
let event = context.next_action().unwrap();
assert_eq!(event, QueryAction::QuerySucceeded { query: QueryId(0) });
Expand All @@ -381,7 +385,7 @@ mod tests {
assert_eq!(in_peers_set.len(), 3);

let in_peers = in_peers_set.iter().map(|peer| peer_to_kad(*peer)).collect();
let mut context = GetRecordContext::new(config, in_peers);
let mut context = GetRecordContext::new(config, in_peers, Vec::new());

for num in 0..3 {
let event = context.next_action().unwrap();
Expand Down Expand Up @@ -420,7 +424,7 @@ mod tests {
assert_eq!(in_peers_set.len(), 3);

let in_peers = [peer_a, peer_b, peer_c].iter().map(|peer| peer_to_kad(*peer)).collect();
let mut context = GetRecordContext::new(config, in_peers);
let mut context = GetRecordContext::new(config, in_peers, Vec::new());

// Schedule peer queries.
for num in 0..3 {
Expand Down
16 changes: 12 additions & 4 deletions src/protocol/libp2p/kademlia/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ impl QueryEngine {
target: RecordKey,
candidates: VecDeque<KademliaPeer>,
quorum: Quorum,
count: usize,
local_record: Option<Record>,
) -> QueryId {
tracing::debug!(
target: LOG_TARGET,
Expand All @@ -290,18 +290,26 @@ impl QueryEngine {
let target = Key::new(target);
let config = GetRecordConfig {
local_peer_id: self.local_peer_id,
known_records: count,
known_records: if local_record.is_some() { 1 } else { 0 },
quorum,
replication_factor: self.replication_factor,
parallelism_factor: self.parallelism_factor,
query: query_id,
target,
};

let found_records = local_record
.into_iter()
.map(|record| PeerRecord {
peer: self.local_peer_id,
record,
})
.collect();

self.queries.insert(
query_id,
QueryType::GetRecord {
context: GetRecordContext::new(config, candidates),
context: GetRecordContext::new(config, candidates, found_records),
},
);

Expand Down Expand Up @@ -731,7 +739,7 @@ mod tests {
]
.into(),
Quorum::All,
3,
None,
);

for _ in 0..4 {
Expand Down
102 changes: 102 additions & 0 deletions tests/protocol/kademlia.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,3 +461,105 @@ async fn get_record_retrieves_remote_records() {
}
}
}

#[tokio::test]
async fn get_record_retrieves_local_and_remote_records() {
let (kad_config1, mut kad_handle1) = KademliaConfigBuilder::new().build();
let (kad_config2, mut kad_handle2) = KademliaConfigBuilder::new().build();

let config1 = ConfigBuilder::new()
.with_tcp(TcpConfig {
listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()],
..Default::default()
})
.with_libp2p_kademlia(kad_config1)
.build();

let config2 = ConfigBuilder::new()
.with_tcp(TcpConfig {
listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()],
..Default::default()
})
.with_libp2p_kademlia(kad_config2)
.build();

let mut litep2p1 = Litep2p::new(config1).unwrap();
let mut litep2p2 = Litep2p::new(config2).unwrap();

// Let peers jnow about each other
kad_handle1
.add_known_peer(
*litep2p2.local_peer_id(),
litep2p2.listen_addresses().cloned().collect(),
)
.await;
kad_handle2
.add_known_peer(
*litep2p1.local_peer_id(),
litep2p1.listen_addresses().cloned().collect(),
)
.await;

// Store the record on `litep2p1``.
let original_record = Record::new(vec![1, 2, 3], vec![0x01]);
let query1 = kad_handle1.put_record(original_record.clone()).await;

let (mut peer1_stored, mut peer2_stored) = (false, false);
let mut query3 = None;

loop {
tokio::select! {
_ = tokio::time::sleep(tokio::time::Duration::from_secs(10)) => {
panic!("record was not retrieved in 10 secs")
}
event = litep2p1.next_event() => {}
event = litep2p2.next_event() => {}
event = kad_handle1.next() => {}
event = kad_handle2.next() => {
match event {
Some(KademliaEvent::IncomingRecord { record: got_record }) => {
assert_eq!(got_record.key, original_record.key);
assert_eq!(got_record.value, original_record.value);
assert_eq!(got_record.publisher.unwrap(), *litep2p1.local_peer_id());
assert!(got_record.expires.is_some());

// Get record.
let query_id = kad_handle2
.get_record(RecordKey::from(vec![1, 2, 3]), Quorum::All).await;
query3 = Some(query_id);
}
Some(KademliaEvent::GetRecordSuccess { query_id: _, records }) => {
match records {
RecordsType::LocalStore(_) => {
panic!("the record was retrieved only from peer2")
}
RecordsType::Network(records) => {
assert_eq!(records.len(), 2);

// Locally retrieved record goes first.
assert_eq!(records[0].peer, *litep2p2.local_peer_id());
assert_eq!(records[0].record.key, original_record.key);
assert_eq!(records[0].record.value, original_record.value);
assert_eq!(records[0].record.publisher.unwrap(), *litep2p1.local_peer_id());
assert!(records[0].record.expires.is_some());

// Remote record from peer 1.
assert_eq!(records[1].peer, *litep2p1.local_peer_id());
assert_eq!(records[1].record.key, original_record.key);
assert_eq!(records[1].record.value, original_record.value);
assert_eq!(records[1].record.publisher.unwrap(), *litep2p1.local_peer_id());
assert!(records[1].record.expires.is_some());

break
}
}
}
Some(KademliaEvent::QueryFailed { query_id: _ }) => {
panic!("peer2 query failed")
}
_ => {}
}
}
}
}
}