Skip to content

Commit

Permalink
fix(disc) Send find_node request only after verifiying our endpoint p…
Browse files Browse the repository at this point in the history
…roof (#4909)
  • Loading branch information
0xprames authored Oct 7, 2023
1 parent 6fd66b3 commit 5de0443
Showing 1 changed file with 113 additions and 19 deletions.
132 changes: 113 additions & 19 deletions crates/net/discv4/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,11 @@ pub struct Discv4Service {
queued_pings: VecDeque<(NodeRecord, PingReason)>,
/// Currently active pings to specific nodes.
pending_pings: HashMap<PeerId, PingRequest>,
/// Currently active endpoint proof verification lookups to specific nodes.
///
/// Entries here means we've proven the peer's endpoint but haven't completed our end of the
/// endpoint proof
pending_lookup: HashMap<PeerId, (Instant, LookupContext)>,
/// Currently active FindNode requests
pending_find_nodes: HashMap<PeerId, FindNodeRequest>,
/// Currently active ENR requests
Expand Down Expand Up @@ -546,6 +551,7 @@ impl Discv4Service {
egress: egress_tx,
queued_pings: Default::default(),
pending_pings: Default::default(),
pending_lookup: Default::default(),
pending_find_nodes: Default::default(),
pending_enr_requests: Default::default(),
commands_rx,
Expand Down Expand Up @@ -988,10 +994,17 @@ impl Discv4Service {
// the ping interval
let mut is_new_insert = false;
let mut needs_bond = false;
let mut is_proven = false;

let old_enr = match self.kbuckets.entry(&key) {
kbucket::Entry::Present(mut entry, _) => entry.value_mut().update_with_enr(ping.enr_sq),
kbucket::Entry::Pending(mut entry, _) => entry.value().update_with_enr(ping.enr_sq),
kbucket::Entry::Present(mut entry, _) => {
is_proven = entry.value().has_endpoint_proof;
entry.value_mut().update_with_enr(ping.enr_sq)
}
kbucket::Entry::Pending(mut entry, _) => {
is_proven = entry.value().has_endpoint_proof;
entry.value().update_with_enr(ping.enr_sq)
}
kbucket::Entry::Absent(entry) => {
let mut node = NodeEntry::new(record);
node.last_enr_seq = ping.enr_sq;
Expand Down Expand Up @@ -1044,6 +1057,19 @@ impl Discv4Service {
self.try_ping(record, PingReason::InitialInsert);
} else if needs_bond {
self.try_ping(record, PingReason::EstablishBond);
} else if is_proven {
// if node has been proven, this means we've recieved a pong and verified its endpoint
// proof. We've also sent a pong above to verify our endpoint proof, so we can now
// send our find_nodes request if PingReason::Lookup
if let Some((_, ctx)) = self.pending_lookup.remove(&record.id) {
if self.pending_find_nodes.contains_key(&record.id) {
// there's already another pending request, unmark it so the next round can
// try to send it
ctx.unmark_queried(record.id);
} else {
self.find_node(&record, ctx);
}
}
} else {
// Request ENR if included in the ping
match (ping.enr_sq, old_enr) {
Expand Down Expand Up @@ -1156,13 +1182,11 @@ impl Discv4Service {
}
PingReason::Lookup(node, ctx) => {
self.update_on_pong(node, pong.enr_sq);
if self.pending_find_nodes.contains_key(&node.id) {
// there's already another pending request, unmark it so the next round can try
// to send it
ctx.unmark_queried(node.id);
} else {
self.find_node(&node, ctx);
}
// insert node and assoc. lookup_context into the pending_lookup table to complete
// our side of the endpoint proof verification.
// Start the lookup timer here - and evict accordingly. Note that this is a separate
// timer than the ping_request timer.
self.pending_lookup.insert(node.id, (Instant::now(), ctx));
}
}
}
Expand Down Expand Up @@ -1279,7 +1303,7 @@ impl Discv4Service {
};

// This is the recursive lookup step where we initiate new FindNode requests for new nodes
// that where discovered.
// that were discovered.
for node in msg.nodes.into_iter().map(NodeRecord::into_ipv4_mapped) {
// prevent banned peers from being added to the context
if self.config.ban_list.is_banned(&node.id, &node.address) {
Expand All @@ -1299,7 +1323,8 @@ impl Discv4Service {
match self.kbuckets.entry(&key) {
BucketEntry::Absent(entry) => {
// the node's endpoint is not proven yet, so we need to ping it first, on
// success, it will initiate a `FindNode` request.
// success, we will add the node to the pending_lookup table, and wait to send
// back a Pong before initiating a FindNode request.
// In order to prevent that this node is selected again on subsequent responses,
// while the ping is still active, we always mark it as queried.
ctx.mark_queried(closest.id);
Expand Down Expand Up @@ -1365,6 +1390,21 @@ impl Discv4Service {
self.remove_node(node_id);
}

let mut failed_lookups = Vec::new();
self.pending_lookup.retain(|node_id, (lookup_sent_at, _)| {
if now.duration_since(*lookup_sent_at) > self.config.ping_expiration {
failed_lookups.push(*node_id);
return false
}
true
});
debug!(target: "discv4", num=%failed_lookups.len(), "evicting nodes due to failed lookup");

// remove nodes that failed the e2e lookup process, so we can restart it
for node_id in failed_lookups {
self.remove_node(node_id);
}

self.evict_failed_neighbours(now);
}

Expand Down Expand Up @@ -1802,7 +1842,7 @@ impl LookupTargetRotator {
/// Tracks lookups across multiple `FindNode` requests.
///
/// If this type is dropped by all
#[derive(Clone)]
#[derive(Clone, Debug)]
struct LookupContext {
inner: Rc<LookupContextInner>,
}
Expand Down Expand Up @@ -1905,7 +1945,7 @@ impl LookupContext {
// guaranteed that there's only 1 owner ([`Discv4Service`]) of all possible [`Rc`] clones of
// [`LookupContext`].
unsafe impl Send for LookupContext {}

#[derive(Debug)]
struct LookupContextInner {
/// The target to lookup.
target: discv5::Key<NodeKey>,
Expand Down Expand Up @@ -2272,7 +2312,7 @@ mod tests {
reth_tracing::init_test_tracing();

let config = Discv4Config::builder().build();
let (_discv4, mut service) = create_discv4_with_config(config).await;
let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;

let id = PeerId::random();
let key = kad_key(id);
Expand All @@ -2296,9 +2336,65 @@ mod tests {

Poll::Ready(())
})
.await
.await;
}

#[tokio::test]
async fn test_on_neighbours_recursive_lookup() {
reth_tracing::init_test_tracing();

let config = Discv4Config::builder().build();
let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
let (_discv4, mut service2) = create_discv4_with_config(config).await;

let id = PeerId::random();
let key = kad_key(id);
let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);

let _ = service.kbuckets.insert_or_update(
&key,
NodeEntry::new_proven(record),
NodeStatus {
direction: ConnectionDirection::Incoming,
state: ConnectionState::Connected,
},
);
// Needed in this test to populate self.pending_find_nodes for as a prereq to a valid
// on_neighbours request
service.lookup_self();
assert_eq!(service.pending_find_nodes.len(), 1);

poll_fn(|cx| {
let _ = service.poll(cx);
assert_eq!(service.pending_find_nodes.len(), 1);

Poll::Ready(())
})
.await;

let expiry = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs() +
10000000000000;
let msg = Neighbours { nodes: vec![service2.local_node_record], expire: expiry };
service.on_neighbours(msg, record.tcp_addr(), id);
// wait for the processed ping
let event = poll_fn(|cx| service2.poll(cx)).await;
assert_eq!(event, Discv4Event::Ping);
// assert that no find_node req has been added here on top of the initial one, since both
// sides of the endpoint proof is not completed here
assert_eq!(service.pending_find_nodes.len(), 1);
// we now wait for PONG
let event = poll_fn(|cx| service.poll(cx)).await;
assert_eq!(event, Discv4Event::Pong);
// Ideally we want to assert against service.pending_lookup.len() here - but because the
// service2 sends Pong and Ping consecutivley on_ping(), the pending_lookup table gets
// drained almost immediately - and no way to grab the handle to its intermediary state here
// :(
let event = poll_fn(|cx| service.poll(cx)).await;
assert_eq!(event, Discv4Event::Ping);
// assert that we've added the find_node req here after both sides of the endpoint proof is
// done
assert_eq!(service.pending_find_nodes.len(), 2);
}
#[tokio::test]
async fn test_no_local_in_closest() {
reth_tracing::init_test_tracing();
Expand Down Expand Up @@ -2463,14 +2559,12 @@ mod tests {
// we now wait for PONG
let event = poll_fn(|cx| service_2.poll(cx)).await;

// Since the endpoint was already proven from 1 POV it can already send a FindNode so the
// next event is either the PONG or Find Node
match event {
Discv4Event::FindNode | Discv4Event::EnrRequest => {
Discv4Event::EnrRequest => {
// since we support enr in the ping it may also request the enr
let event = poll_fn(|cx| service_2.poll(cx)).await;
match event {
Discv4Event::FindNode | Discv4Event::EnrRequest => {
Discv4Event::EnrRequest => {
let event = poll_fn(|cx| service_2.poll(cx)).await;
assert_eq!(event, Discv4Event::Pong);
}
Expand Down

0 comments on commit 5de0443

Please sign in to comment.