From a939710212040221eb7f6b62343d71e7a209a826 Mon Sep 17 00:00:00 2001 From: Andrei Maiboroda Date: Mon, 11 Feb 2019 15:00:47 +0100 Subject: [PATCH 1/9] Skip sending Ping when we've already sent one to this node --- libp2p/NodeTable.cpp | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/libp2p/NodeTable.cpp b/libp2p/NodeTable.cpp index fbb8e6eba3c..096b1def8c3 100644 --- a/libp2p/NodeTable.cpp +++ b/libp2p/NodeTable.cpp @@ -69,6 +69,8 @@ void NodeTable::processEvents() bool NodeTable::addNode(Node const& _node) { + LOG(m_logger) << "Adding node " << _node; + shared_ptr entry; DEV_GUARDED(x_nodes) { @@ -90,6 +92,8 @@ bool NodeTable::addNode(Node const& _node) bool NodeTable::addKnownNode( Node const& _node, uint32_t _lastPongReceivedTime, uint32_t _lastPongSentTime) { + LOG(m_logger) << "Adding known node " << _node; + shared_ptr entry; DEV_GUARDED(x_nodes) { @@ -204,6 +208,8 @@ void NodeTable::doDiscover(NodeID _node, unsigned _round, shared_ptrhasValidEndpointProof()) { + LOG(m_logger) << "Node " << static_cast(*node) + << " endpoint proof expired."; ping(*node); continue; } @@ -311,6 +317,9 @@ void NodeTable::ping(NodeEntry const& _nodeEntry, boost::optional const& if (_ec || m_timers.isStopped()) return; + if (contains(m_sentPings, _nodeEntry.id)) + return; + NodeIPEndpoint src; src = m_hostNodeEndpoint; PingNode p(src, _nodeEntry.endpoint); @@ -330,6 +339,7 @@ void NodeTable::evict(NodeEntry const& _leastSeen, NodeEntry const& _new) if (!m_socket->isOpen()) return; + LOG(m_logger) << "Evicting node " << static_cast(_leastSeen); ping(_leastSeen, _new.id); } From 1dd33d64d8c2b19b0da7264df3a36123551a635d Mon Sep 17 00:00:00 2001 From: Andrei Maiboroda Date: Mon, 11 Feb 2019 17:25:27 +0100 Subject: [PATCH 2/9] Add unit test for addNode the same node twice --- test/unittests/libp2p/net.cpp | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/test/unittests/libp2p/net.cpp b/test/unittests/libp2p/net.cpp index b8eac4bfd60..1eab25b7673 100644 --- a/test/unittests/libp2p/net.cpp +++ b/test/unittests/libp2p/net.cpp @@ -1095,6 +1095,34 @@ BOOST_AUTO_TEST_CASE(pingNotSentAfterPongForKnownNode) BOOST_REQUIRE(datagram7->typeName() == "Ping"); } +BOOST_AUTO_TEST_CASE(addNodePingsNodeOnlyOnce) +{ + // NodeTable sending PING + TestNodeTableHost nodeTableHost(0); + nodeTableHost.start(); + auto& nodeTable = nodeTableHost.nodeTable; + + // add a node to node table, initiating PING + auto const nodePort = randomPortNumber(); + auto nodeEndpoint = NodeIPEndpoint{bi::address::from_string(c_localhostIp), nodePort, nodePort}; + auto nodePubKey = KeyPair::create().pub(); + nodeTable->addNode(Node{nodePubKey, nodeEndpoint}); + + auto sentPing = nodeTable->nodeValidation(nodePubKey); + BOOST_REQUIRE(sentPing.is_initialized()); + + this_thread::sleep_for(chrono::milliseconds(2000)); + + // add it for the second time + nodeTable->addNode(Node{nodePubKey, nodeEndpoint}); + + auto sentPing2 = nodeTable->nodeValidation(nodePubKey); + BOOST_REQUIRE(sentPing2.is_initialized()); + + // check that Ping was sent only once, so Ping hash didn't change + BOOST_REQUIRE_EQUAL(sentPing->pingHash, sentPing2->pingHash); +} + BOOST_AUTO_TEST_SUITE_END() BOOST_FIXTURE_TEST_SUITE(netTypes, TestOutputHelperFixture) From 6537567fbc5793437f44f1ea04e970ac580a50fe Mon Sep 17 00:00:00 2001 From: Andrei Maiboroda Date: Tue, 12 Feb 2019 15:05:22 +0100 Subject: [PATCH 3/9] Remove ignored replacement node in case trying to ping one evicted node several times. --- libp2p/NodeTable.cpp | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/libp2p/NodeTable.cpp b/libp2p/NodeTable.cpp index 096b1def8c3..478855c90cb 100644 --- a/libp2p/NodeTable.cpp +++ b/libp2p/NodeTable.cpp @@ -317,8 +317,15 @@ void NodeTable::ping(NodeEntry const& _nodeEntry, boost::optional const& if (_ec || m_timers.isStopped()) return; - if (contains(m_sentPings, _nodeEntry.id)) + // don't sent Ping if one is already sent + auto sentPing = m_sentPings.find(_nodeEntry.id); + if (sentPing != m_sentPings.end()) + { + // we don't need replacement if we're not going to ping + if (_replacementNodeID && sentPing->second.replacementNodeID != _replacementNodeID) + DEV_GUARDED(x_nodes) { m_allNodes.erase(*_replacementNodeID); } return; + } NodeIPEndpoint src; src = m_hostNodeEndpoint; From 9dcc4dab451023c9b8998b93502611f65ef7db66 Mon Sep 17 00:00:00 2001 From: Andrei Maiboroda Date: Tue, 12 Feb 2019 18:02:37 +0100 Subject: [PATCH 4/9] Split ping() into synchronous ping() and asynchronous schedulePing() --- libp2p/NodeTable.cpp | 59 +++++++++++++++++++++++++------------------- libp2p/NodeTable.h | 6 ++++- 2 files changed, 39 insertions(+), 26 deletions(-) diff --git a/libp2p/NodeTable.cpp b/libp2p/NodeTable.cpp index 478855c90cb..9761d013f53 100644 --- a/libp2p/NodeTable.cpp +++ b/libp2p/NodeTable.cpp @@ -84,7 +84,7 @@ bool NodeTable::addNode(Node const& _node) return false; if (!entry->hasValidEndpointProof()) - ping(*entry); + schedulePing(*entry); return true; } @@ -105,7 +105,7 @@ bool NodeTable::addKnownNode( if (entry->hasValidEndpointProof()) noteActiveNode(entry->id, entry->endpoint); else - ping(*entry); + schedulePing(*entry); return true; } @@ -312,32 +312,30 @@ vector> NodeTable::nearestNodeEntries(NodeID _target) void NodeTable::ping(NodeEntry const& _nodeEntry, boost::optional const& _replacementNodeID) { - m_timers.schedule(0, [this, _nodeEntry, _replacementNodeID]( - boost::system::error_code const& _ec) { + // don't sent Ping if one is already sent + if (contains(m_sentPings, _nodeEntry.id)) + return; + + NodeIPEndpoint src; + src = m_hostNodeEndpoint; + PingNode p(src, _nodeEntry.endpoint); + p.ts = nextRequestExpirationTime(); + auto const pingHash = p.sign(m_secret); + LOG(m_logger) << p.typeName() << " to " << _nodeEntry.id << "@" << p.destination; + m_socket->send(p); + + m_sentPings[_nodeEntry.id] = {chrono::steady_clock::now(), pingHash, _replacementNodeID}; + if (m_nodeEventHandler && _replacementNodeID) + m_nodeEventHandler->appendEvent(_nodeEntry.id, NodeEntryScheduledForEviction); +} + +void NodeTable::schedulePing(NodeEntry const& _nodeEntry) +{ + m_timers.schedule(0, [this, _nodeEntry](boost::system::error_code const& _ec) { if (_ec || m_timers.isStopped()) return; - // don't sent Ping if one is already sent - auto sentPing = m_sentPings.find(_nodeEntry.id); - if (sentPing != m_sentPings.end()) - { - // we don't need replacement if we're not going to ping - if (_replacementNodeID && sentPing->second.replacementNodeID != _replacementNodeID) - DEV_GUARDED(x_nodes) { m_allNodes.erase(*_replacementNodeID); } - return; - } - - NodeIPEndpoint src; - src = m_hostNodeEndpoint; - PingNode p(src, _nodeEntry.endpoint); - p.ts = nextRequestExpirationTime(); - auto const pingHash = p.sign(m_secret); - LOG(m_logger) << p.typeName() << " to " << _nodeEntry.id << "@" << p.destination; - m_socket->send(p); - - m_sentPings[_nodeEntry.id] = {chrono::steady_clock::now(), pingHash, _replacementNodeID}; - if (m_nodeEventHandler && _replacementNodeID) - m_nodeEventHandler->appendEvent(_nodeEntry.id, NodeEntryScheduledForEviction); + ping(_nodeEntry, {}); }); } @@ -346,6 +344,14 @@ void NodeTable::evict(NodeEntry const& _leastSeen, NodeEntry const& _new) if (!m_socket->isOpen()) return; + // if eviction for _leastSeen already started, just forget about _new + auto sentPing = m_sentPings.find(_leastSeen.id); + if (sentPing != m_sentPings.end() && sentPing->second.replacementNodeID != _new.id) + { + DEV_GUARDED(x_nodes) { m_allNodes.erase(_new.id); } + return; + } + LOG(m_logger) << "Evicting node " << static_cast(_leastSeen); ping(_leastSeen, _new.id); } @@ -488,7 +494,10 @@ void NodeTable::onPacketReceived( auto const& optionalReplacementID = sentPing->second.replacementNodeID; if (optionalReplacementID) if (auto replacementNode = nodeEntry(*optionalReplacementID)) + { + m_sentPings.erase(replacementNode->id); dropNode(move(replacementNode)); + } m_sentPings.erase(sentPing); diff --git a/libp2p/NodeTable.h b/libp2p/NodeTable.h index e092d24ed66..580d7f1a064 100644 --- a/libp2p/NodeTable.h +++ b/libp2p/NodeTable.h @@ -205,9 +205,13 @@ class NodeTable : UDPSocketEvents /// Used to ping a node to initiate the endpoint proof. Used when contacting neighbours if they /// don't have a valid endpoint proof (see doDiscover), refreshing buckets and as part of - /// eviction process (see evict). Not synchronous - the ping operation is queued via a timer + /// eviction process (see evict). Synchronous, has to be called only from the network thread. void ping(NodeEntry const& _nodeEntry, boost::optional const& _replacementNodeID = {}); + /// Schedules ping() method to be called from the network thread. + /// Not synchronous - the ping operation is queued via a timer. + void schedulePing(NodeEntry const& _nodeEntry); + /// Used by asynchronous operations to return NodeEntry which is active and managed by node table. std::shared_ptr nodeEntry(NodeID _id); From 0daaaa5d3a64b20c9caed6abaee22561ae975419 Mon Sep 17 00:00:00 2001 From: Andrei Maiboroda Date: Thu, 14 Feb 2019 18:20:32 +0100 Subject: [PATCH 5/9] Make m_allNodes contain only the nodes of the node table buckets. Don't put not yet validated nodes there, neither the ones that don't fit to the bucket and are replacement nodes for evicted ones. Replacement nodes are kept only in the m_sentPing items. --- libp2p/NodeTable.cpp | 255 ++++++++++++++++++---------------- libp2p/NodeTable.h | 37 ++--- test/unittests/libp2p/net.cpp | 64 ++++----- 3 files changed, 179 insertions(+), 177 deletions(-) diff --git a/libp2p/NodeTable.cpp b/libp2p/NodeTable.cpp index 9761d013f53..29f4e7e32a1 100644 --- a/libp2p/NodeTable.cpp +++ b/libp2p/NodeTable.cpp @@ -71,20 +71,21 @@ bool NodeTable::addNode(Node const& _node) { LOG(m_logger) << "Adding node " << _node; - shared_ptr entry; + if (!isValidNode(_node)) + return false; + + bool needToPing = false; DEV_GUARDED(x_nodes) { auto const it = m_allNodes.find(_node.id); - if (it == m_allNodes.end()) - entry = createNodeEntry(_node, 0, 0); - else - entry = it->second; + needToPing = (it == m_allNodes.end() || !it->second->hasValidEndpointProof()); } - if (!entry) - return false; - if (!entry->hasValidEndpointProof()) - schedulePing(*entry); + if (needToPing) + { + LOG(m_logger) << "Pending " << _node; + schedulePing(_node); + } return true; } @@ -94,58 +95,55 @@ bool NodeTable::addKnownNode( { LOG(m_logger) << "Adding known node " << _node; - shared_ptr entry; - DEV_GUARDED(x_nodes) + if (!isValidNode(_node)) + return false; + + if (nodeEntry(_node.id)) { - entry = createNodeEntry(_node, _lastPongReceivedTime, _lastPongSentTime); + LOG(m_logger) << "Node " << _node << " is already in the node table"; + return true; } - if (!entry) - return false; + + auto entry = make_shared( + m_hostNodeID, _node.id, _node.endpoint, _lastPongReceivedTime, _lastPongSentTime); if (entry->hasValidEndpointProof()) - noteActiveNode(entry->id, entry->endpoint); + { + LOG(m_logger) << "Known " << _node; + noteActiveNode(move(entry), entry->endpoint); + } else - schedulePing(*entry); + { + LOG(m_logger) << "Pending " << _node; + schedulePing(_node); + } return true; } -std::shared_ptr NodeTable::createNodeEntry( - Node const& _node, uint32_t _lastPongReceivedTime, uint32_t _lastPongSentTime) +bool NodeTable::isValidNode(Node const& _node) const { if (!_node.endpoint || !_node.id) { LOG(m_logger) << "Supplied node " << _node << " has an invalid endpoint or id. Skipping adding node to node table."; - return {}; + return false; } if (!isAllowedEndpoint(_node.endpoint)) { LOG(m_logger) << "Supplied node" << _node << " doesn't have an allowed endpoint. Skipping adding node to node table"; - return {}; + return false; } if (m_hostNodeID == _node.id) { LOG(m_logger) << "Skip adding self to node table (" << _node.id << ")"; - return {}; - } - - if (m_allNodes.find(_node.id) != m_allNodes.end()) - { - LOG(m_logger) << "Node " << _node << " is already in the node table"; - return {}; + return false; } - auto nodeEntry = make_shared( - m_hostNodeID, _node.id, _node.endpoint, _lastPongReceivedTime, _lastPongSentTime); - m_allNodes.insert({_node.id, nodeEntry}); - - LOG(m_logger) << (_lastPongReceivedTime > 0 ? "Known " : "Pending ") << _node; - - return nodeEntry; + return true; } list NodeTable::nodes() const @@ -184,7 +182,7 @@ Node NodeTable::node(NodeID const& _id) return UnspecifiedNode; } -shared_ptr NodeTable::nodeEntry(NodeID _id) +shared_ptr NodeTable::nodeEntry(NodeID const& _id) { Guard l(x_nodes); auto const it = m_allNodes.find(_id); @@ -310,118 +308,116 @@ vector> NodeTable::nearestNodeEntries(NodeID _target) return ret; } -void NodeTable::ping(NodeEntry const& _nodeEntry, boost::optional const& _replacementNodeID) +void NodeTable::ping(Node const& _node, shared_ptr _replacementNodeEntry) { - // don't sent Ping if one is already sent - if (contains(m_sentPings, _nodeEntry.id)) + // Don't sent Ping if one is already sent + if (contains(m_sentPings, _node.id)) return; NodeIPEndpoint src; src = m_hostNodeEndpoint; - PingNode p(src, _nodeEntry.endpoint); + PingNode p(src, _node.endpoint); p.ts = nextRequestExpirationTime(); auto const pingHash = p.sign(m_secret); - LOG(m_logger) << p.typeName() << " to " << _nodeEntry.id << "@" << p.destination; + LOG(m_logger) << p.typeName() << " to " << _node; m_socket->send(p); - m_sentPings[_nodeEntry.id] = {chrono::steady_clock::now(), pingHash, _replacementNodeID}; - if (m_nodeEventHandler && _replacementNodeID) - m_nodeEventHandler->appendEvent(_nodeEntry.id, NodeEntryScheduledForEviction); + m_sentPings[_node.id] = { + _node.endpoint, chrono::steady_clock::now(), pingHash, move(_replacementNodeEntry)}; } -void NodeTable::schedulePing(NodeEntry const& _nodeEntry) +void NodeTable::schedulePing(Node const& _node) { - m_timers.schedule(0, [this, _nodeEntry](boost::system::error_code const& _ec) { + m_timers.schedule(0, [this, _node](boost::system::error_code const& _ec) { if (_ec || m_timers.isStopped()) return; - ping(_nodeEntry, {}); + ping(_node, {}); }); } -void NodeTable::evict(NodeEntry const& _leastSeen, NodeEntry const& _new) +void NodeTable::evict(NodeEntry const& _leastSeen, shared_ptr _replacement) { if (!m_socket->isOpen()) return; - // if eviction for _leastSeen already started, just forget about _new - auto sentPing = m_sentPings.find(_leastSeen.id); - if (sentPing != m_sentPings.end() && sentPing->second.replacementNodeID != _new.id) - { - DEV_GUARDED(x_nodes) { m_allNodes.erase(_new.id); } - return; - } - LOG(m_logger) << "Evicting node " << static_cast(_leastSeen); - ping(_leastSeen, _new.id); + ping(_leastSeen, std::move(_replacement)); + + if (m_nodeEventHandler) + m_nodeEventHandler->appendEvent(_leastSeen.id, NodeEntryScheduledForEviction); } -void NodeTable::noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _endpoint) +void NodeTable::noteActiveNode(shared_ptr _nodeEntry, bi::udp::endpoint const& _endpoint) { - if (_pubk == m_hostNodeID) + assert(_nodeEntry); + + if (_nodeEntry->id == m_hostNodeID) { LOG(m_logger) << "Skipping making self active."; return; } if (!isAllowedEndpoint(NodeIPEndpoint(_endpoint.address(), _endpoint.port(), _endpoint.port()))) { - LOG(m_logger) << "Skipping making node with unallowed endpoint active. Node " << _pubk - << "@" << _endpoint; + LOG(m_logger) << "Skipping making node with unallowed endpoint active. Node " + << _nodeEntry->id << "@" << _endpoint; return; } - shared_ptr newNode = nodeEntry(_pubk); - if (newNode && newNode->hasValidEndpointProof()) - { - LOG(m_logger) << "Active node " << _pubk << '@' << _endpoint; - newNode->endpoint.setAddress(_endpoint.address()); - newNode->endpoint.setUdpPort(_endpoint.port()); + if (!_nodeEntry->hasValidEndpointProof()) + return; + LOG(m_logger) << "Active node " << _nodeEntry->id << '@' << _endpoint; + // TODO don't activate in case endpoint has changed + _nodeEntry->endpoint.setAddress(_endpoint.address()); + _nodeEntry->endpoint.setUdpPort(_endpoint.port()); - shared_ptr nodeToEvict; - { - Guard l(x_state); - // Find a bucket to put a node to - NodeBucket& s = bucket_UNSAFE(newNode.get()); - auto& nodes = s.nodes; - // check if the node is already in the bucket - auto it = std::find(nodes.begin(), nodes.end(), newNode); - if (it != nodes.end()) + shared_ptr nodeToEvict; + { + Guard l(x_state); + // Find a bucket to put a node to + NodeBucket& s = bucket_UNSAFE(_nodeEntry.get()); + auto& nodes = s.nodes; + + // check if the node is already in the bucket + auto it = std::find(nodes.begin(), nodes.end(), _nodeEntry); + if (it != nodes.end()) + { + // if it was in the bucket, move it to the last position + nodes.splice(nodes.end(), nodes, it); + } + else + { + if (nodes.size() < s_bucketSize) { - // if it was in the bucket, move it to the last position - nodes.splice(nodes.end(), nodes, it); + // if it was not there, just add it as a most recently seen node + // (i.e. to the end of the list) + nodes.push_back(_nodeEntry); + DEV_GUARDED(x_nodes) { m_allNodes.insert({_nodeEntry->id, _nodeEntry}); } + if (m_nodeEventHandler) + m_nodeEventHandler->appendEvent(_nodeEntry->id, NodeEntryAdded); } else { - if (nodes.size() < s_bucketSize) + // if bucket is full, start eviction process for the least recently seen node + nodeToEvict = nodes.front().lock(); + // It could have been replaced in addNode(), then weak_ptr is expired. + // If so, just add a new one instead of expired + if (!nodeToEvict) { - // if it was not there, just add it as a most recently seen node - // (i.e. to the end of the list) - nodes.push_back(newNode); + nodes.pop_front(); + nodes.push_back(_nodeEntry); + DEV_GUARDED(x_nodes) { m_allNodes.insert({_nodeEntry->id, _nodeEntry}); } if (m_nodeEventHandler) - m_nodeEventHandler->appendEvent(newNode->id, NodeEntryAdded); - } - else - { - // if bucket is full, start eviction process for the least recently seen node - nodeToEvict = nodes.front().lock(); - // It could have been replaced in addNode(), then weak_ptr is expired. - // If so, just add a new one instead of expired - if (!nodeToEvict) - { - nodes.pop_front(); - nodes.push_back(newNode); - if (m_nodeEventHandler) - m_nodeEventHandler->appendEvent(newNode->id, NodeEntryAdded); - } + m_nodeEventHandler->appendEvent(_nodeEntry->id, NodeEntryAdded); } } } - - if (nodeToEvict) - evict(*nodeToEvict, *newNode); } + + if (nodeToEvict) + evict(*nodeToEvict, _nodeEntry); } void NodeTable::dropNode(shared_ptr _n) @@ -462,6 +458,8 @@ void NodeTable::onPacketReceived( } LOG(m_logger) << packet->typeName() << " from " << packet->sourceid << "@" << _from; + + shared_ptr sourceNodeEntry; switch (packet->packetType()) { case Pong::type: @@ -485,19 +483,20 @@ void NodeTable::onPacketReceived( return; } - auto const sourceNodeEntry = nodeEntry(sourceId); - assert(sourceNodeEntry); - sourceNodeEntry->lastPongReceivedTime = RLPXDatagramFace::secondsSinceEpoch(); - - // Valid PONG received, so we don't want to evict this node, - // and we don't need to remember replacement node anymore - auto const& optionalReplacementID = sentPing->second.replacementNodeID; - if (optionalReplacementID) - if (auto replacementNode = nodeEntry(*optionalReplacementID)) + // create or update nodeEntry with new Pong received time + DEV_GUARDED(x_nodes) + { + auto it = m_allNodes.find(sourceId); + if (it == m_allNodes.end()) + sourceNodeEntry = make_shared(m_hostNodeID, sourceId, + sentPing->second.endpoint, RLPXDatagramFace::secondsSinceEpoch(), 0); + else { - m_sentPings.erase(replacementNode->id); - dropNode(move(replacementNode)); + sourceNodeEntry = it->second; + sourceNodeEntry->lastPongReceivedTime = + RLPXDatagramFace::secondsSinceEpoch(); } + } m_sentPings.erase(sentPing); @@ -514,7 +513,16 @@ void NodeTable::onPacketReceived( case Neighbours::type: { + sourceNodeEntry = nodeEntry(packet->sourceid); + if (!sourceNodeEntry) + { + LOG(m_logger) << "Source node (" << packet->sourceid << "@" << _from + << ") not found in node table. Ignoring Neighbours packet."; + return; + } + auto const& in = dynamic_cast(*packet); + bool expected = false; auto now = chrono::steady_clock::now(); m_sentFindNodes.remove_if([&](NodeIdTimePoint const& _t) noexcept { @@ -538,7 +546,7 @@ void NodeTable::onPacketReceived( case FindNode::type: { - auto const& sourceNodeEntry = nodeEntry(packet->sourceid); + sourceNodeEntry = nodeEntry(packet->sourceid); if (!sourceNodeEntry) { LOG(m_logger) << "Source node (" << packet->sourceid << "@" << _from @@ -588,17 +596,20 @@ void NodeTable::onPacketReceived( p.sign(m_secret); m_socket->send(p); - DEV_GUARDED(x_nodes) - { - auto const it = m_allNodes.find(in.sourceid); - if (it != m_allNodes.end()) - it->second->lastPongSentTime = RLPXDatagramFace::secondsSinceEpoch(); - } + // Quirk: when the node is a replacement node (that is, not added to the node table + // yet, but can be added after another node's eviction), it will not be returned + // from nodeEntry() and we won't update its lastPongSentTime. But that shouldn't be + // a big problem, at worst it can lead to more Ping-Pongs than needed. + sourceNodeEntry = nodeEntry(packet->sourceid); + if (sourceNodeEntry) + sourceNodeEntry->lastPongSentTime = RLPXDatagramFace::secondsSinceEpoch(); + break; } } - noteActiveNode(packet->sourceid, _from); + if (sourceNodeEntry) + noteActiveNode(move(sourceNodeEntry), _from); } catch (std::exception const& _e) { @@ -650,10 +661,8 @@ void NodeTable::doHandleTimeouts() dropNode(move(node)); // save the replacement node that should be activated - if (it->second.replacementNodeID) - if (auto replacement = nodeEntry(*it->second.replacementNodeID)) - nodesToActivate.emplace_back(replacement); - + if (it->second.replacementNodeEntry) + nodesToActivate.emplace_back(move(it->second.replacementNodeEntry)); } it = m_sentPings.erase(it); @@ -664,7 +673,7 @@ void NodeTable::doHandleTimeouts() // activate replacement nodes and put them into buckets for (auto const& n : nodesToActivate) - noteActiveNode(n->id, n->endpoint); + noteActiveNode(n, n->endpoint); doHandleTimeouts(); }); diff --git a/libp2p/NodeTable.h b/libp2p/NodeTable.h index 580d7f1a064..69d7a9d9017 100644 --- a/libp2p/NodeTable.h +++ b/libp2p/NodeTable.h @@ -126,14 +126,17 @@ class NodeTable : UDPSocketEvents /// Called by implementation which provided handler to process NodeEntryAdded/NodeEntryDropped events. Events are coalesced by type whereby old events are ignored. void processEvents(); - /// Add node to the list of all nodes and ping it to trigger the endpoint proof. + /// Starts async node adding tot the node table by pinging it to trigger the endpoint proof. + /// In case the node is already in the node table, pings only if the endpoint proof expired. /// - /// @return True if the node has been added. + /// @return True if the node id valid. bool addNode(Node const& _node); /// Add node to the list of all nodes and add it to the node table. + /// In case the node's endpoint proof expired, pings it. + /// In case the nodes is already in the node table, ignores add request. /// - /// @return True if the node has been added to the table. + /// @return True if the node is valid. bool addKnownNode( Node const& _node, uint32_t _lastPongReceivedTime, uint32_t _lastPongSentTime); @@ -163,14 +166,15 @@ class NodeTable : UDPSocketEvents // protected only for derived classes in tests protected: /** - * NodeValidation is used to record the timepoint of sent PING, + * NodeValidation is used to record Pinged node's endpoint, the timepoint of sent PING, * time of sending and the new node ID to replace unresponsive node. */ struct NodeValidation { + NodeIPEndpoint endpoint; TimePoint pingSendTime; h256 pingHash; - boost::optional replacementNodeID; + std::shared_ptr replacementNodeEntry; }; /// Constants for Kademlia, derived from address space. @@ -200,20 +204,21 @@ class NodeTable : UDPSocketEvents std::list> nodes; }; - std::shared_ptr createNodeEntry( - Node const& _node, uint32_t _lastPongReceivedTime, uint32_t _lastPongSentTime); + /// @return true if the node is valid to be added to the node table. + /// (validates node ID and endpoint) + bool isValidNode(Node const& _node) const; /// Used to ping a node to initiate the endpoint proof. Used when contacting neighbours if they /// don't have a valid endpoint proof (see doDiscover), refreshing buckets and as part of /// eviction process (see evict). Synchronous, has to be called only from the network thread. - void ping(NodeEntry const& _nodeEntry, boost::optional const& _replacementNodeID = {}); + void ping(Node const& _node, std::shared_ptr _replacementNodeEntry = {}); /// Schedules ping() method to be called from the network thread. /// Not synchronous - the ping operation is queued via a timer. - void schedulePing(NodeEntry const& _nodeEntry); + void schedulePing(Node const& _node); /// Used by asynchronous operations to return NodeEntry which is active and managed by node table. - std::shared_ptr nodeEntry(NodeID _id); + std::shared_ptr nodeEntry(NodeID const& _id); /// Used to discovery nodes on network which are close to the given target. /// Sends s_alpha concurrent requests to nodes nearest to target, for nodes nearest to target, up to s_maxSteps rounds. @@ -222,12 +227,13 @@ class NodeTable : UDPSocketEvents /// Returns nodes from node table which are closest to target. std::vector> nearestNodeEntries(NodeID _target); - /// Asynchronously drops _leastSeen node if it doesn't reply and adds _new node, otherwise _new node is thrown away. - void evict(NodeEntry const& _leastSeen, NodeEntry const& _new); + /// Asynchronously drops _leastSeen node if it doesn't reply and adds _replacement node, + /// otherwise _replacement is thrown away. + void evict(NodeEntry const& _leastSeen, std::shared_ptr _replacement); /// Called whenever activity is received from a node in order to maintain node table. Only /// called for nodes for which we've completed an endpoint proof. - void noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _endpoint); + void noteActiveNode(std::shared_ptr _nodeEntry, bi::udp::endpoint const& _endpoint); /// Used to drop node when timeout occurs or when evict() result is to keep previous node. void dropNode(std::shared_ptr _n); @@ -277,9 +283,8 @@ class NodeTable : UDPSocketEvents mutable Mutex x_nodes; ///< LOCK x_state first if both locks are required. Mutable for thread-safe copy in nodes() const. - /// Node endpoints. Includes all nodes that we've been in contact with and which haven't been - /// evicted. This includes nodes for which we both have and haven't completed the endpoint - /// proof. + /// Node endpoints. Includes all nodes that were added into node table's buckets + /// and have not been evicted yet. std::unordered_map> m_allNodes; mutable Mutex x_state; ///< LOCK x_state first if both x_nodes and x_state locks are required. diff --git a/test/unittests/libp2p/net.cpp b/test/unittests/libp2p/net.cpp index 1eab25b7673..e2ebf0f4d00 100644 --- a/test/unittests/libp2p/net.cpp +++ b/test/unittests/libp2p/net.cpp @@ -77,14 +77,10 @@ struct TestNodeTable: public NodeTable if (_count--) { // manually add node for test - { - Guard ln(x_nodes); - m_allNodes[n.first] = make_shared(m_hostNodeID, n.first, - NodeIPEndpoint(ourIp, n.second, n.second), - RLPXDatagramFace::secondsSinceEpoch(), - RLPXDatagramFace::secondsSinceEpoch()); - } - noteActiveNode(n.first, bi::udp::endpoint(ourIp, n.second)); + auto entry = make_shared(m_hostNodeID, n.first, + NodeIPEndpoint(ourIp, n.second, n.second), + RLPXDatagramFace::secondsSinceEpoch(), RLPXDatagramFace::secondsSinceEpoch()); + noteActiveNode(move(entry), bi::udp::endpoint(ourIp, n.second)); } else break; @@ -100,17 +96,12 @@ struct TestNodeTable: public NodeTable bi::address ourIp = bi::address::from_string(c_localhostIp); while (testNode != _testNodes.end()) { - unsigned distance = 0; // manually add node for test - { - Guard ln(x_nodes); - auto node(make_shared(m_hostNodeID, testNode->first, - NodeIPEndpoint(ourIp, testNode->second, testNode->second), - RLPXDatagramFace::secondsSinceEpoch(), RLPXDatagramFace::secondsSinceEpoch())); - m_allNodes[node->id] = node; - distance = node->distance; - } - noteActiveNode(testNode->first, bi::udp::endpoint(ourIp, testNode->second)); + auto node(make_shared(m_hostNodeID, testNode->first, + NodeIPEndpoint(ourIp, testNode->second, testNode->second), + RLPXDatagramFace::secondsSinceEpoch(), RLPXDatagramFace::secondsSinceEpoch())); + auto distance = node->distance; + noteActiveNode(move(node), bi::udp::endpoint(ourIp, testNode->second)); { Guard stateGuard(x_state); @@ -137,21 +128,18 @@ struct TestNodeTable: public NodeTable while (testNode != _testNodes.end() && bucketSize(_bucket) < _bucketSize) { // manually add node for test + // skip the nodes for other buckets + size_t const dist = distance(m_hostNodeID, testNode->first); + if (dist != _bucket + 1) { - // skip the nodes for other buckets - size_t const dist = distance(m_hostNodeID, testNode->first); - if (dist != _bucket + 1) - { - ++testNode; - continue; - } - - Guard ln(x_nodes); - m_allNodes[testNode->first] = make_shared(m_hostNodeID, testNode->first, - NodeIPEndpoint(ourIp, testNode->second, testNode->second), - RLPXDatagramFace::secondsSinceEpoch(), RLPXDatagramFace::secondsSinceEpoch()); + ++testNode; + continue; } - noteActiveNode(testNode->first, bi::udp::endpoint(ourIp, testNode->second)); + + auto entry = make_shared(m_hostNodeID, testNode->first, + NodeIPEndpoint(ourIp, testNode->second, testNode->second), + RLPXDatagramFace::secondsSinceEpoch(), RLPXDatagramFace::secondsSinceEpoch()); + noteActiveNode(move(entry), bi::udp::endpoint(ourIp, testNode->second)); ++testNode; } @@ -514,7 +502,7 @@ BOOST_AUTO_TEST_CASE(noteActiveNodeUpdatesKnownNode) auto& nodeTable = nodeTableHost.nodeTable; auto knownNode = nodeTable->bucketFirstNode(bucketIndex); - nodeTable->noteActiveNode(knownNode->id, knownNode->endpoint); + nodeTable->noteActiveNode(knownNode, knownNode->endpoint); // check that node was moved to the back of the bucket BOOST_CHECK_NE(nodeTable->bucketFirstNode(bucketIndex), knownNode); @@ -560,8 +548,8 @@ BOOST_AUTO_TEST_CASE(noteActiveNodeEvictsTheNodeWhenBucketIsFull) // but added to evictions auto evicted = nodeTable->nodeValidation(leastRecentlySeenNode->id); BOOST_REQUIRE(evicted.is_initialized()); - BOOST_REQUIRE(evicted->replacementNodeID); - BOOST_CHECK_EQUAL(*evicted->replacementNodeID, newNodeId); + BOOST_REQUIRE(evicted->replacementNodeEntry); + BOOST_CHECK_EQUAL(evicted->replacementNodeEntry->id, newNodeId); } BOOST_AUTO_TEST_CASE(noteActiveNodeReplacesNodeInFullBucketWhenEndpointChanged) @@ -571,21 +559,21 @@ BOOST_AUTO_TEST_CASE(noteActiveNodeReplacesNodeInFullBucketWhenEndpointChanged) BOOST_REQUIRE(bucketIndex >= 0); auto& nodeTable = nodeTableHost.nodeTable; - auto leastRecentlySeenNodeId = nodeTable->bucketFirstNode(bucketIndex)->id; + auto leastRecentlySeenNode = nodeTable->bucketFirstNode(bucketIndex); // addNode will replace the node in the m_allNodes map, because it's the same id with enother // endpoint auto const port = randomPortNumber(); NodeIPEndpoint newEndpoint{bi::address::from_string(c_localhostIp), port, port }; - nodeTable->noteActiveNode(leastRecentlySeenNodeId, newEndpoint); + nodeTable->noteActiveNode(leastRecentlySeenNode, newEndpoint); // the bucket is still max size BOOST_CHECK_EQUAL(nodeTable->bucketSize(bucketIndex), 16); // least recently seen node removed - BOOST_CHECK_NE(nodeTable->bucketFirstNode(bucketIndex)->id, leastRecentlySeenNodeId); + BOOST_CHECK_NE(nodeTable->bucketFirstNode(bucketIndex)->id, leastRecentlySeenNode->id); // but added as most recently seen with new endpoint auto mostRecentNodeEntry = nodeTable->bucketLastNode(bucketIndex); - BOOST_CHECK_EQUAL(mostRecentNodeEntry->id, leastRecentlySeenNodeId); + BOOST_CHECK_EQUAL(mostRecentNodeEntry->id, leastRecentlySeenNode->id); BOOST_CHECK_EQUAL(mostRecentNodeEntry->endpoint.address(), newEndpoint.address()); BOOST_CHECK_EQUAL(mostRecentNodeEntry->endpoint.udpPort(), newEndpoint.udpPort()); } From 6b54ecf2253af511b28164aa5019b855b7fd2f09 Mon Sep 17 00:00:00 2001 From: Andrei Maiboroda Date: Thu, 14 Feb 2019 18:38:01 +0100 Subject: [PATCH 6/9] Address minor review issues. --- libp2p/NodeTable.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/libp2p/NodeTable.cpp b/libp2p/NodeTable.cpp index 29f4e7e32a1..85bc0ca88f3 100644 --- a/libp2p/NodeTable.cpp +++ b/libp2p/NodeTable.cpp @@ -312,11 +312,12 @@ void NodeTable::ping(Node const& _node, shared_ptr _replacementNodeEn { // Don't sent Ping if one is already sent if (contains(m_sentPings, _node.id)) + { + LOG(m_logger) << "Ignoring request to ping " << _node << ", because it's already pinged"; return; + } - NodeIPEndpoint src; - src = m_hostNodeEndpoint; - PingNode p(src, _node.endpoint); + PingNode p{m_hostNodeEndpoint, _node.endpoint}; p.ts = nextRequestExpirationTime(); auto const pingHash = p.sign(m_secret); LOG(m_logger) << p.typeName() << " to " << _node; From bd42df4d07317a356ee668aae8110dacddf24b25 Mon Sep 17 00:00:00 2001 From: Andrei Maiboroda Date: Fri, 15 Feb 2019 11:14:39 +0100 Subject: [PATCH 7/9] Fix typos --- libp2p/NodeTable.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libp2p/NodeTable.h b/libp2p/NodeTable.h index 69d7a9d9017..0d0ae1d2767 100644 --- a/libp2p/NodeTable.h +++ b/libp2p/NodeTable.h @@ -126,10 +126,10 @@ class NodeTable : UDPSocketEvents /// Called by implementation which provided handler to process NodeEntryAdded/NodeEntryDropped events. Events are coalesced by type whereby old events are ignored. void processEvents(); - /// Starts async node adding tot the node table by pinging it to trigger the endpoint proof. + /// Starts async node add to the node table by pinging it to trigger the endpoint proof. /// In case the node is already in the node table, pings only if the endpoint proof expired. /// - /// @return True if the node id valid. + /// @return True if the node is valid. bool addNode(Node const& _node); /// Add node to the list of all nodes and add it to the node table. From 2cdb3f48673acb55543bc202c651005e7ea917a8 Mon Sep 17 00:00:00 2001 From: Andrei Maiboroda Date: Fri, 15 Feb 2019 12:05:50 +0100 Subject: [PATCH 8/9] Fix tests after making m_allNodes contain only node table nodes --- test/unittests/libp2p/net.cpp | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/test/unittests/libp2p/net.cpp b/test/unittests/libp2p/net.cpp index e2ebf0f4d00..87010150dd7 100644 --- a/test/unittests/libp2p/net.cpp +++ b/test/unittests/libp2p/net.cpp @@ -626,9 +626,10 @@ BOOST_AUTO_TEST_CASE(invalidPong) // wait for PONG to be received and handled nodeTable->packetsReceived.pop(); - BOOST_REQUIRE(nodeTable->nodeExists(nodePubKey)); - auto addedNode = nodeTable->nodeEntry(nodePubKey); - BOOST_CHECK_EQUAL(addedNode->lastPongReceivedTime, 0); + // pending node validation should still be not deleted + BOOST_REQUIRE(nodeTable->nodeValidation(nodePubKey)); + // node is not in the node table + BOOST_REQUIRE(!nodeTable->nodeExists(nodePubKey)); } BOOST_AUTO_TEST_CASE(validPong) @@ -950,7 +951,8 @@ BOOST_AUTO_TEST_CASE(pingFromLocalhost) BOOST_AUTO_TEST_CASE(addSelf) { - TestNodeTableHost nodeTableHost(512); + TestNodeTableHost nodeTableHost(0); + nodeTableHost.start(); auto& nodeTable = nodeTableHost.nodeTable; size_t expectedNodeCount = 0; @@ -960,28 +962,26 @@ BOOST_AUTO_TEST_CASE(addSelf) auto nodePort = nodeSocketHost.port; auto nodeEndpoint = NodeIPEndpoint{ bi::address::from_string(c_localhostIp), nodePort, nodePort }; - // Create arbitrary node and verify it can be added to the node table - auto nodeKeyPair = KeyPair::create(); - Node node(nodeKeyPair.pub(), nodeEndpoint); + // Create arbitrary node and verify it can be pinged + auto nodePubKey = KeyPair::create().pub(); + Node node(nodePubKey, nodeEndpoint); nodeTable->addNode(node); - BOOST_CHECK(nodeTable->count() == ++expectedNodeCount); + BOOST_CHECK(nodeTable->nodeValidation(nodePubKey)); - // Create self node and verify it isn't added to the node table + // Create self node and verify it isn't pinged Node self(nodeTableHost.m_alias.pub(), nodeEndpoint); nodeTable->addNode(self); - BOOST_CHECK(nodeTable->count() == ++expectedNodeCount - 1); + BOOST_CHECK(!nodeTable->nodeValidation(nodeTableHost.m_alias.pub())); } BOOST_AUTO_TEST_CASE(findNodeIsSentAfterPong) { // Node Table receiving Ping and sending FindNode - TestNodeTableHost nodeTableHost1(15); - nodeTableHost1.populate(); + TestNodeTableHost nodeTableHost1(0); nodeTableHost1.start(); auto& nodeTable1 = nodeTableHost1.nodeTable; - TestNodeTableHost nodeTableHost2(512, nodeTable1->m_hostNodeEndpoint.udpPort() + 1); - nodeTableHost2.populate(); + TestNodeTableHost nodeTableHost2(0, nodeTable1->m_hostNodeEndpoint.udpPort() + 1); nodeTableHost2.start(); auto& nodeTable2 = nodeTableHost2.nodeTable; From ceffe8025be20d2288324adfe27e72e171a1c6fc Mon Sep 17 00:00:00 2001 From: Andrei Maiboroda Date: Fri, 15 Feb 2019 13:12:35 +0100 Subject: [PATCH 9/9] Fix tests crash on clang build --- libp2p/NodeTable.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2p/NodeTable.cpp b/libp2p/NodeTable.cpp index 85bc0ca88f3..03b9d15f2b0 100644 --- a/libp2p/NodeTable.cpp +++ b/libp2p/NodeTable.cpp @@ -110,7 +110,7 @@ bool NodeTable::addKnownNode( if (entry->hasValidEndpointProof()) { LOG(m_logger) << "Known " << _node; - noteActiveNode(move(entry), entry->endpoint); + noteActiveNode(move(entry), _node.endpoint); } else {