Skip to content
This repository has been archived by the owner on Oct 28, 2021. It is now read-only.

Commit

Permalink
Merge pull request #5552 from ethereum/refactor-packet-received
Browse files Browse the repository at this point in the history
Split NodeTable::onPacketReceived into separate handler methods
  • Loading branch information
gumb0 authored Apr 9, 2019
2 parents 7c6e6da + 0809592 commit dbbfd5b
Show file tree
Hide file tree
Showing 2 changed files with 198 additions and 167 deletions.
355 changes: 188 additions & 167 deletions libp2p/NodeTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -466,176 +466,21 @@ void NodeTable::onPacketReceived(
shared_ptr<NodeEntry> sourceNodeEntry;
switch (packet->packetType())
{
case Pong::type:
{
// validate pong
auto const sentPing = m_sentPings.find(_from);
if (sentPing == m_sentPings.end())
{
LOG(m_logger) << "Unexpected PONG from " << _from.address().to_string() << ":"
<< _from.port();
return;
}

auto const& pong = dynamic_cast<Pong const&>(*packet);
auto const& nodeValidation = sentPing->second;
if (pong.echo != nodeValidation.pingHash)
{
LOG(m_logger) << "Invalid PONG from " << _from.address().to_string() << ":"
<< _from.port();
return;
}

// in case the node answers with new NodeID, drop the record with the old NodeID
auto const& sourceId = pong.sourceid;
if (sourceId != nodeValidation.nodeID)
{
LOG(m_logger) << "Node " << _from << " changed public key from "
<< nodeValidation.nodeID << " to " << sourceId;
if (auto node = nodeEntry(nodeValidation.nodeID))
dropNode(move(node));
}

// create or update nodeEntry with new Pong received time
DEV_GUARDED(x_nodes)
{
auto it = m_allNodes.find(sourceId);
if (it == m_allNodes.end())
sourceNodeEntry = make_shared<NodeEntry>(m_hostNodeID, sourceId,
NodeIPEndpoint{_from.address(), _from.port(), nodeValidation.tcpPort},
RLPXDatagramFace::secondsSinceEpoch(), 0 /* lastPongSentTime */);
else
{
sourceNodeEntry = it->second;
sourceNodeEntry->lastPongReceivedTime =
RLPXDatagramFace::secondsSinceEpoch();

if (sourceNodeEntry->endpoint() != _from)
sourceNodeEntry->node.endpoint = NodeIPEndpoint{
_from.address(), _from.port(), nodeValidation.tcpPort};
}
}

m_sentPings.erase(_from);

// update our endpoint address and UDP port
DEV_GUARDED(x_nodes)
{
if ((!m_hostNodeEndpoint || !isAllowedEndpoint(m_hostNodeEndpoint)) &&
isPublicAddress(pong.destination.address()))
m_hostNodeEndpoint.setAddress(pong.destination.address());
m_hostNodeEndpoint.setUdpPort(pong.destination.udpPort());
}
break;
}

case Neighbours::type:
{
sourceNodeEntry = nodeEntry(packet->sourceid);
if (!sourceNodeEntry)
{
LOG(m_logger) << "Source node (" << packet->sourceid << "@" << _from
<< ") not found in node table. Ignoring Neighbours packet.";
return;
}
if (sourceNodeEntry->endpoint() != _from)
{
LOG(m_logger) << "Neighbours packet from unexpected endpoint " << _from
<< " instead of " << sourceNodeEntry->endpoint();
return;
}

auto const& in = dynamic_cast<Neighbours const&>(*packet);

bool expected = false;
auto now = chrono::steady_clock::now();
m_sentFindNodes.remove_if([&](NodeIdTimePoint const& _t) noexcept {
if (_t.first != in.sourceid)
return false;
if (now - _t.second < c_reqTimeoutMs)
expected = true;
return true;
});
if (!expected)
{
cnetdetails << "Dropping unsolicited neighbours packet from "
<< _from.address();
break;
}

for (auto const& n : in.neighbours)
addNode(Node(n.node, n.endpoint));
break;
}

case FindNode::type:
{
sourceNodeEntry = nodeEntry(packet->sourceid);
if (!sourceNodeEntry)
{
LOG(m_logger) << "Source node (" << packet->sourceid << "@" << _from
<< ") not found in node table. Ignoring FindNode request.";
return;
}
if (sourceNodeEntry->endpoint() != _from)
{
LOG(m_logger) << "FindNode packet from unexpected endpoint " << _from
<< " instead of " << sourceNodeEntry->endpoint();
return;
}
if (!sourceNodeEntry->lastPongReceivedTime)
{
LOG(m_logger) << "Unexpected FindNode packet! Endpoint proof hasn't been performed yet.";
return;
}
if (!sourceNodeEntry->hasValidEndpointProof())
{
LOG(m_logger) << "Unexpected FindNode packet! Endpoint proof has expired.";
return;
}
case Pong::type:
sourceNodeEntry = handlePong(_from, *packet);
break;

auto const& in = dynamic_cast<FindNode const&>(*packet);
vector<shared_ptr<NodeEntry>> nearest = nearestNodeEntries(in.target);
static unsigned constexpr nlimit = (NodeSocket::maxDatagramSize - 109) / 90;
for (unsigned offset = 0; offset < nearest.size(); offset += nlimit)
{
Neighbours out(_from, nearest, offset, nlimit);
out.ts = nextRequestExpirationTime();
LOG(m_logger) << out.typeName() << " to " << in.sourceid << "@" << _from;
out.sign(m_secret);
if (out.data.size() > 1280)
cnetlog << "Sending truncated datagram, size: " << out.data.size();
m_socket->send(out);
}
break;
}
case Neighbours::type:
sourceNodeEntry = handleNeighbours(_from, *packet);
break;

case PingNode::type:
{
auto& in = dynamic_cast<PingNode&>(*packet);
in.source.setAddress(_from.address());
in.source.setUdpPort(_from.port());
if (!addNode({in.sourceid, in.source}))
return; // Need to have valid endpoint proof before adding node to node table.

// Send PONG response.
Pong p(in.source);
LOG(m_logger) << p.typeName() << " to " << in.sourceid << "@" << _from;
p.ts = nextRequestExpirationTime();
p.echo = in.echo;
p.sign(m_secret);
m_socket->send(p);

// Quirk: when the node is a replacement node (that is, not added to the node table
// yet, but can be added after another node's eviction), it will not be returned
// from nodeEntry() and we won't update its lastPongSentTime. But that shouldn't be
// a big problem, at worst it can lead to more Ping-Pongs than needed.
sourceNodeEntry = nodeEntry(packet->sourceid);
if (sourceNodeEntry)
sourceNodeEntry->lastPongSentTime = RLPXDatagramFace::secondsSinceEpoch();
case FindNode::type:
sourceNodeEntry = handleFindNode(_from, *packet);
break;

break;
}
case PingNode::type:
sourceNodeEntry = handlePingNode(_from, *packet);
break;
}

if (sourceNodeEntry)
Expand All @@ -653,6 +498,182 @@ void NodeTable::onPacketReceived(
}
}

shared_ptr<NodeEntry> NodeTable::handlePong(
bi::udp::endpoint const& _from, DiscoveryDatagram const& _packet)
{
// validate pong
auto const sentPing = m_sentPings.find(_from);
if (sentPing == m_sentPings.end())
{
LOG(m_logger) << "Unexpected PONG from " << _from.address().to_string() << ":"
<< _from.port();
return {};
}

auto const& pong = dynamic_cast<Pong const&>(_packet);
auto const& nodeValidation = sentPing->second;
if (pong.echo != nodeValidation.pingHash)
{
LOG(m_logger) << "Invalid PONG from " << _from.address().to_string() << ":" << _from.port();
return {};
}

// in case the node answers with new NodeID, drop the record with the old NodeID
auto const& sourceId = pong.sourceid;
if (sourceId != nodeValidation.nodeID)
{
LOG(m_logger) << "Node " << _from << " changed public key from " << nodeValidation.nodeID
<< " to " << sourceId;
if (auto node = nodeEntry(nodeValidation.nodeID))
dropNode(move(node));
}

// create or update nodeEntry with new Pong received time
shared_ptr<NodeEntry> sourceNodeEntry;
DEV_GUARDED(x_nodes)
{
auto it = m_allNodes.find(sourceId);
if (it == m_allNodes.end())
sourceNodeEntry = make_shared<NodeEntry>(m_hostNodeID, sourceId,
NodeIPEndpoint{_from.address(), _from.port(), nodeValidation.tcpPort},
RLPXDatagramFace::secondsSinceEpoch(), 0 /* lastPongSentTime */);
else
{
sourceNodeEntry = it->second;
sourceNodeEntry->lastPongReceivedTime = RLPXDatagramFace::secondsSinceEpoch();

if (sourceNodeEntry->endpoint() != _from)
sourceNodeEntry->node.endpoint =
NodeIPEndpoint{_from.address(), _from.port(), nodeValidation.tcpPort};
}
}

m_sentPings.erase(_from);

// update our endpoint address and UDP port
DEV_GUARDED(x_nodes)
{
if ((!m_hostNodeEndpoint || !isAllowedEndpoint(m_hostNodeEndpoint)) &&
isPublicAddress(pong.destination.address()))
m_hostNodeEndpoint.setAddress(pong.destination.address());
m_hostNodeEndpoint.setUdpPort(pong.destination.udpPort());
}

return sourceNodeEntry;
}

shared_ptr<NodeEntry> NodeTable::handleNeighbours(
bi::udp::endpoint const& _from, DiscoveryDatagram const& _packet)
{
shared_ptr<NodeEntry> sourceNodeEntry = nodeEntry(_packet.sourceid);
if (!sourceNodeEntry)
{
LOG(m_logger) << "Source node (" << _packet.sourceid << "@" << _from
<< ") not found in node table. Ignoring Neighbours packet.";
return {};
}
if (sourceNodeEntry->endpoint() != _from)
{
LOG(m_logger) << "Neighbours packet from unexpected endpoint " << _from << " instead of "
<< sourceNodeEntry->endpoint();
return {};
}

auto const& in = dynamic_cast<Neighbours const&>(_packet);

bool expected = false;
auto now = chrono::steady_clock::now();
m_sentFindNodes.remove_if([&](NodeIdTimePoint const& _t) noexcept {
if (_t.first != in.sourceid)
return false;
if (now - _t.second < c_reqTimeoutMs)
expected = true;
return true;
});
if (!expected)
{
cnetdetails << "Dropping unsolicited neighbours packet from " << _from.address();
return sourceNodeEntry;
}

for (auto const& n : in.neighbours)
addNode(Node(n.node, n.endpoint));

return sourceNodeEntry;
}

std::shared_ptr<NodeEntry> NodeTable::handleFindNode(
bi::udp::endpoint const& _from, DiscoveryDatagram const& _packet)
{
std::shared_ptr<NodeEntry> sourceNodeEntry = nodeEntry(_packet.sourceid);
if (!sourceNodeEntry)
{
LOG(m_logger) << "Source node (" << _packet.sourceid << "@" << _from
<< ") not found in node table. Ignoring FindNode request.";
return {};
}
if (sourceNodeEntry->endpoint() != _from)
{
LOG(m_logger) << "FindNode packet from unexpected endpoint " << _from << " instead of "
<< sourceNodeEntry->endpoint();
return {};
}
if (!sourceNodeEntry->lastPongReceivedTime)
{
LOG(m_logger) << "Unexpected FindNode packet! Endpoint proof hasn't been performed yet.";
return {};
}
if (!sourceNodeEntry->hasValidEndpointProof())
{
LOG(m_logger) << "Unexpected FindNode packet! Endpoint proof has expired.";
return {};
}

auto const& in = dynamic_cast<FindNode const&>(_packet);
vector<shared_ptr<NodeEntry>> nearest = nearestNodeEntries(in.target);
static unsigned constexpr nlimit = (NodeSocket::maxDatagramSize - 109) / 90;
for (unsigned offset = 0; offset < nearest.size(); offset += nlimit)
{
Neighbours out(_from, nearest, offset, nlimit);
out.ts = nextRequestExpirationTime();
LOG(m_logger) << out.typeName() << " to " << in.sourceid << "@" << _from;
out.sign(m_secret);
if (out.data.size() > 1280)
cnetlog << "Sending truncated datagram, size: " << out.data.size();
m_socket->send(out);
}

return sourceNodeEntry;
}

std::shared_ptr<NodeEntry> NodeTable::handlePingNode(
bi::udp::endpoint const& _from, DiscoveryDatagram const& _packet)
{
auto const& in = dynamic_cast<PingNode const&>(_packet);

NodeIPEndpoint sourceEndpoint{_from.address(), _from.port(), in.source.tcpPort()};
if (!addNode({in.sourceid, sourceEndpoint}))
return {}; // Need to have valid endpoint proof before adding node to node table.

// Send PONG response.
Pong p(sourceEndpoint);
LOG(m_logger) << p.typeName() << " to " << in.sourceid << "@" << _from;
p.ts = nextRequestExpirationTime();
p.echo = in.echo;
p.sign(m_secret);
m_socket->send(p);

// Quirk: when the node is a replacement node (that is, not added to the node table
// yet, but can be added after another node's eviction), it will not be returned
// from nodeEntry() and we won't update its lastPongSentTime. But that shouldn't be
// a big problem, at worst it can lead to more Ping-Pongs than needed.
std::shared_ptr<NodeEntry> sourceNodeEntry = nodeEntry(_packet.sourceid);
if (sourceNodeEntry)
sourceNodeEntry->lastPongSentTime = RLPXDatagramFace::secondsSinceEpoch();

return sourceNodeEntry;
}

void NodeTable::doDiscovery()
{
m_discoveryTimer->expires_from_now(c_bucketRefreshMs);
Expand Down
10 changes: 10 additions & 0 deletions libp2p/NodeTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class NodeTable;
inline std::ostream& operator<<(std::ostream& _out, NodeTable const& _nodeTable);

struct NodeEntry;
struct DiscoveryDatagram;

/**
* NodeTable using modified kademlia for node discovery and preference.
Expand Down Expand Up @@ -277,6 +278,15 @@ class NodeTable : UDPSocketEvents
void onPacketReceived(
UDPSocketFace*, bi::udp::endpoint const& _from, bytesConstRef _packet) override;

std::shared_ptr<NodeEntry> handlePong(
bi::udp::endpoint const& _from, DiscoveryDatagram const& _packet);
std::shared_ptr<NodeEntry> handleNeighbours(
bi::udp::endpoint const& _from, DiscoveryDatagram const& _packet);
std::shared_ptr<NodeEntry> handleFindNode(
bi::udp::endpoint const& _from, DiscoveryDatagram const& _packet);
std::shared_ptr<NodeEntry> handlePingNode(
bi::udp::endpoint const& _from, DiscoveryDatagram const& _packet);

/// Called by m_socket when socket is disconnected.
void onSocketDisconnected(UDPSocketFace*) override {}

Expand Down

0 comments on commit dbbfd5b

Please sign in to comment.