Skip to content
This repository has been archived by the owner on Oct 28, 2021. It is now read-only.

Commit

Permalink
Merge pull request #5523 from ethereum/capability-timers
Browse files Browse the repository at this point in the history
Fix use of timers when managing capability background work loops
  • Loading branch information
gumb0 authored Mar 26, 2019
2 parents ad6f54f + 105cd3d commit 3c6c4e5
Show file tree
Hide file tree
Showing 11 changed files with 226 additions and 303 deletions.
126 changes: 55 additions & 71 deletions libethereum/EthereumCapability.cpp
Original file line number Diff line number Diff line change
@@ -1,27 +1,14 @@
/*
This file is part of cpp-ethereum.
cpp-ethereum is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
cpp-ethereum is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
// Aleth: Ethereum C++ client, tools and libraries.
// Copyright 2019 Aleth Authors.
// Licensed under the GNU General Public License, Version 3.

#include "EthereumCapability.h"
#include "BlockChain.h"
#include "BlockChainSync.h"
#include "BlockQueue.h"
#include "TransactionQueue.h"
#include <libdevcore/Common.h>
#include <libethcore/Exceptions.h>
#include <libp2p/Common.h>
#include <libp2p/Host.h>
#include <libp2p/Session.h>
#include <chrono>
Expand All @@ -31,17 +18,19 @@ using namespace std;
using namespace dev;
using namespace dev::eth;

static unsigned const c_maxSendTransactions = 256;
static unsigned const c_maxHeadersToSend = 1024;
static unsigned const c_maxIncomingNewHashes = 1024;
static int const c_backroundWorkPeriodMs = 1000;
static int const c_minBlockBroadcastPeers = 4;

char const* const EthereumCapability::s_stateNames[static_cast<int>(SyncState::Size)] = {
char const* const EthereumCapability::c_stateNames[static_cast<int>(SyncState::Size)] = {
"NotSynced", "Idle", "Waiting", "Blocks", "State"};

std::chrono::milliseconds constexpr EthereumCapability::c_backgroundWorkInterval;

namespace
{
constexpr unsigned c_maxSendTransactions = 256;
constexpr unsigned c_maxHeadersToSend = 1024;
constexpr unsigned c_maxIncomingNewHashes = 1024;
constexpr unsigned c_peerTimeoutSeconds = 10;
constexpr int c_minBlockBroadcastPeers = 4;

string toString(Asking _a)
{
switch (_a)
Expand Down Expand Up @@ -410,15 +399,9 @@ EthereumCapability::EthereumCapability(shared_ptr<p2p::CapabilityHostFace> _host
m_urng = std::mt19937_64(seed());
}

void EthereumCapability::onStarting()
{
m_backgroundWorkEnabled = true;
m_host->scheduleExecution(c_backroundWorkPeriodMs, [this]() { doBackgroundWork(); });
}

void EthereumCapability::onStopping()
std::chrono::milliseconds EthereumCapability::backgroundWorkInterval() const
{
m_backgroundWorkEnabled = false;
return c_backgroundWorkInterval;
}

bool EthereumCapability::ensureInitialised()
Expand All @@ -441,7 +424,7 @@ void EthereumCapability::reset()

// reset() can be called from RPC handling thread,
// but we access m_latestBlockSent and m_transactionsSent only from the network thread
m_host->scheduleExecution(0, [this]() {
m_host->postWork([this]() {
m_latestBlockSent = h256();
m_transactionsSent.clear();
});
Expand All @@ -452,43 +435,6 @@ void EthereumCapability::completeSync()
m_sync->completeSync();
}

void EthereumCapability::doBackgroundWork()
{
ensureInitialised();
auto h = m_chain.currentHash();
// If we've finished our initial sync (including getting all the blocks into the chain so as to reduce invalid transactions), start trading transactions & blocks
if (!isSyncing() && m_chain.isKnown(m_latestBlockSent))
{
if (m_newTransactions)
{
m_newTransactions = false;
maintainTransactions();
}
if (m_newBlocks)
{
m_newBlocks = false;
maintainBlocks(h);
}
}

time_t now = std::chrono::system_clock::to_time_t(chrono::system_clock::now());
if (now - m_lastTick >= 1)
{
m_lastTick = now;
for (auto const& peer : m_peers)
{
time_t now = std::chrono::system_clock::to_time_t(chrono::system_clock::now());

if (now - peer.second.lastAsk() > 10 && peer.second.isConversing())
// timeout
m_host->disconnect(peer.first, p2p::PingTimeout);
}
}

if (m_backgroundWorkEnabled)
m_host->scheduleExecution(c_backroundWorkPeriodMs, [this]() { doBackgroundWork(); });
}

void EthereumCapability::maintainTransactions()
{
// Send any new transactions.
Expand All @@ -499,6 +445,8 @@ void EthereumCapability::maintainTransactions()
{
auto const& t = ts[i];
bool unsent = !m_transactionsSent.count(t.sha3());

// Build list of peers to send transactions to
auto const peers = selectPeers([&](EthereumPeer const& _peer) {
return _peer.isWaitingForTransactions() ||
(unsent && !_peer.isTransactionKnown(t.sha3()));
Expand All @@ -510,6 +458,7 @@ void EthereumCapability::maintainTransactions()
m_transactionsSent.insert(t.sha3());
}

// Send transactions to peers
for (auto& peer : m_peers)
{
bytes b;
Expand Down Expand Up @@ -646,7 +595,7 @@ SyncStatus EthereumCapability::status() const
void EthereumCapability::onTransactionImported(
ImportResult _ir, h256 const& _h, h512 const& _nodeId)
{
m_host->scheduleExecution(0, [this, _ir, _h, _nodeId]() {
m_host->postWork([this, _ir, _h, _nodeId]() {
auto itPeerStatus = m_peers.find(_nodeId);
if (itPeerStatus == m_peers.end())
return;
Expand Down Expand Up @@ -900,6 +849,41 @@ bool EthereumCapability::interpretCapabilityPacket(
return true;
}

void EthereumCapability::doBackgroundWork()
{
ensureInitialised();
auto h = m_chain.currentHash();
// If we've finished our initial sync (including getting all the blocks into the chain so as to
// reduce invalid transactions), start trading transactions & blocks
if (!isSyncing() && m_chain.isKnown(m_latestBlockSent))
{
if (m_newTransactions)
{
m_newTransactions = false;
maintainTransactions();
}
if (m_newBlocks)
{
m_newBlocks = false;
maintainBlocks(h);
}
}

time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
if (now - m_lastTick >= 1)
{
m_lastTick = now;
for (auto const& peer : m_peers)
{
time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());

if (now - peer.second.lastAsk() > c_peerTimeoutSeconds && peer.second.isConversing())
// timeout
m_host->disconnect(peer.first, p2p::PingTimeout);
}
}
}

void EthereumCapability::setIdle(NodeID const& _peerID)
{
setAsking(_peerID, Asking::Nothing);
Expand Down
38 changes: 13 additions & 25 deletions libethereum/EthereumCapability.h
Original file line number Diff line number Diff line change
@@ -1,19 +1,6 @@
/*
This file is part of cpp-ethereum.
cpp-ethereum is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
cpp-ethereum is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
// Aleth: Ethereum C++ client, tools and libraries.
// Copyright 2019 Aleth Authors.
// Licensed under the GNU General Public License, Version 3.

#pragma once

Expand Down Expand Up @@ -103,10 +90,9 @@ class EthereumCapability : public p2p::CapabilityFace

std::string name() const override { return "eth"; }
unsigned version() const override { return c_protocolVersion; }
p2p::CapDesc descriptor() const override { return {name(), version()}; }
unsigned messageCount() const override { return PacketCount; }

void onStarting() override;
void onStopping() override;
std::chrono::milliseconds backgroundWorkInterval() const override;

unsigned protocolVersion() const { return c_protocolVersion; }
u256 networkId() const { return m_networkId; }
Expand All @@ -128,31 +114,35 @@ class EthereumCapability : public p2p::CapabilityFace
BlockQueue const& bq() const { return m_bq; }
SyncStatus status() const;

static char const* stateName(SyncState _s) { return s_stateNames[static_cast<int>(_s)]; }
static char const* stateName(SyncState _s) { return c_stateNames[static_cast<int>(_s)]; }

static unsigned const c_oldProtocolVersion;

void onConnect(NodeID const& _nodeID, u256 const& _peerCapabilityVersion) override;
void onDisconnect(NodeID const& _nodeID) override;
bool interpretCapabilityPacket(NodeID const& _peerID, unsigned _id, RLP const& _r) override;

/// Main work loop - sends new transactions and blocks to available peers and disconnects from
/// timed out peers
void doBackgroundWork() override;

p2p::CapabilityHostFace& capabilityHost() { return *m_host; }

EthereumPeer const& peer(NodeID const& _peerID) const;
EthereumPeer& peer(NodeID const& _peerID);
void disablePeer(NodeID const& _peerID, std::string const& _problem);

private:
static char const* const s_stateNames[static_cast<int>(SyncState::Size)];
static char const* const c_stateNames[static_cast<int>(SyncState::Size)];
static constexpr std::chrono::milliseconds c_backgroundWorkInterval{1000};

std::vector<NodeID> selectPeers(
std::function<bool(EthereumPeer const&)> const& _predicate) const;

std::pair<std::vector<NodeID>, std::vector<NodeID>> randomPartitionPeers(
std::vector<NodeID> const& _peers, std::size_t _number) const;

void doBackgroundWork();

/// Send top transactions (by nonce and gas price) to available peers
void maintainTransactions();
void maintainBlocks(h256 const& _currentBlock);
void onTransactionImported(ImportResult _ir, h256 const& _h, h512 const& _nodeId);
Expand Down Expand Up @@ -192,8 +182,6 @@ class EthereumCapability : public p2p::CapabilityFace

std::unordered_map<NodeID, EthereumPeer> m_peers;

std::atomic<bool> m_backgroundWorkEnabled = {false};

mutable std::mt19937_64 m_urng; // Mersenne Twister psuedo-random number generator

Logger m_logger{createLogger(VerbosityDebug, "ethcap")};
Expand Down
63 changes: 21 additions & 42 deletions libethereum/WarpCapability.cpp
Original file line number Diff line number Diff line change
@@ -1,19 +1,6 @@
/*
This file is part of cpp-ethereum.
cpp-ethereum is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
cpp-ethereum is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
// Aleth: Ethereum C++ client, tools and libraries.
// Copyright 2019 Aleth Authors.
// Licensed under the GNU General Public License, Version 3.

#include "WarpCapability.h"
#include "BlockChain.h"
Expand All @@ -26,10 +13,12 @@ namespace dev
{
namespace eth
{

std::chrono::milliseconds constexpr WarpCapability::c_backgroundWorkInterval;

namespace
{
static size_t const c_freePeerBufferSize = 32;
static int const c_backroundWorkPeriodMs = 1000;

bool validateManifest(RLP const& _manifestRlp)
{
Expand Down Expand Up @@ -323,15 +312,9 @@ WarpCapability::WarpCapability(std::shared_ptr<p2p::CapabilityHostFace> _host,
{
}

void WarpCapability::onStarting()
std::chrono::milliseconds WarpCapability::backgroundWorkInterval() const
{
m_backgroundWorkEnabled = true;
m_host->scheduleExecution(c_backroundWorkPeriodMs, [this]() { doBackgroundWork(); });
}

void WarpCapability::onStopping()
{
m_backgroundWorkEnabled = false;
return c_backgroundWorkInterval;
}

std::shared_ptr<WarpPeerObserverFace> WarpCapability::createPeerObserver(
Expand All @@ -340,23 +323,6 @@ std::shared_ptr<WarpPeerObserverFace> WarpCapability::createPeerObserver(
return std::make_shared<WarpPeerObserver>(*this, m_blockChain, _snapshotDownloadPath);
}

void WarpCapability::doBackgroundWork()
{
for (auto const& peer : m_peers)
{
time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
auto const& status = peer.second;
if (now - status.m_lastAsk > 10 && status.m_asking != Asking::Nothing)
{
// timeout
m_host->disconnect(peer.first, p2p::PingTimeout);
}
}

if (m_backgroundWorkEnabled)
m_host->scheduleExecution(c_backroundWorkPeriodMs, [this]() { doBackgroundWork(); });
}

void WarpCapability::onConnect(NodeID const& _peerID, u256 const& /* _peerCapabilityVersion */)
{
m_peers.emplace(_peerID, WarpPeerStatus{});
Expand Down Expand Up @@ -487,6 +453,19 @@ void WarpCapability::onDisconnect(NodeID const& _peerID)
m_peers.erase(_peerID);
}

void WarpCapability::doBackgroundWork()
{
for (auto const& peer : m_peers)
{
time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
auto const& status = peer.second;
if (now - status.m_lastAsk > 10 && status.m_asking != Asking::Nothing)
{
// timeout
m_host->disconnect(peer.first, p2p::PingTimeout);
}
}
}

void WarpCapability::requestStatus(NodeID const& _peerID, unsigned _hostProtocolVersion,
u256 const& _hostNetworkId, u256 const& _chainTotalDifficulty, h256 const& _chainCurrentHash,
Expand Down
Loading

0 comments on commit 3c6c4e5

Please sign in to comment.