Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove boost threads #269

Merged
merged 20 commits into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
f242967
remove boost::chrono
Naviabheeman Jun 1, 2023
3efeff8
make script validation checkqueue threads use internal pool instead o…
Naviabheeman Jun 1, 2023
fb234b1
Revert "remove boost::chrono"
Naviabheeman Jun 1, 2023
e6bf363
remove boost::thread and use std::thread
Naviabheeman Jun 1, 2023
fe42797
add cmath to fix build
Naviabheeman Jun 2, 2023
d561f07
fix build - add missing headers, change boost::chrono to std chrono
Naviabheeman Jun 2, 2023
35826fd
remove static from all critical section code to fix linking error
Naviabheeman Jun 2, 2023
c8f5a22
remove CheckLastCritical from leave critical section
Naviabheeman Jun 2, 2023
a00ca8f
replace boost::interuption point with std:: yield
Naviabheeman Jun 2, 2023
ad8d47e
remove scheduling from bench assemble block
Naviabheeman Jun 3, 2023
1279321
fix hanging benchmark test
Naviabheeman Jun 3, 2023
cc7f9b1
fix test - stop scheduler threads separately.
Naviabheeman Jun 3, 2023
fa463dd
remove this_thread::yield
Naviabheeman Jun 19, 2023
1541464
review feedback - fix unused boost::thread interrupt and comments
Naviabheeman Jun 21, 2023
f0288d1
fix CI failure in CheckQueueControl. Replace c style interface with R…
Naviabheeman Jun 22, 2023
c57eb00
change scheduler thread in test_tapyrus to use the same syntax as ta…
Naviabheeman Jun 22, 2023
ff3b571
fix initialization of scheduler service thread
Naviabheeman Jun 22, 2023
1c15ee0
port commits 0682003 to 95ad70a from bitcoin to fix test hanging
Naviabheeman Jul 3, 2023
7dfc615
ifixing build - use move operations in all vchecks
Naviabheeman Jul 3, 2023
6e3dbff
review feedback - remove unused header
Naviabheeman Jul 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/addrman.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <hash.h>
#include <serialize.h>
#include <streams.h>
#include <cmath>

int CAddrInfo::GetTriedBucket(const uint256& nKey) const
{
Expand Down
9 changes: 5 additions & 4 deletions src/bench/block_assemble.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ static void AssembleBlock(benchmark::State& state)
InitSignatureCache();
InitScriptExecutionCache();

boost::thread_group thread_group;
CScheduler scheduler;
std::thread thread(std::bind(&CScheduler::serviceQueue, &scheduler));
{
::pblocktree.reset(new CBlockTreeDB(1 << 20, true));
::pcoinsdbview.reset(new CCoinsViewDB(1 << 23, true));
::pcoinsTip.reset(new CCoinsViewCache(pcoinsdbview.get()));

thread_group.create_thread(std::bind(&CScheduler::serviceQueue, &scheduler));
GetMainSignals().RegisterBackgroundSignalScheduler(scheduler);

LoadGenesisBlock();
CValidationState state;
ActivateBestChain(state);
Expand Down Expand Up @@ -130,8 +130,9 @@ static void AssembleBlock(benchmark::State& state)
PrepareBlock(SCRIPT_PUB);
}

thread_group.interrupt_all();
thread_group.join_all();
scheduler.stop();
if(thread.joinable())
thread.join();
GetMainSignals().FlushBackgroundCallbacks();
GetMainSignals().UnregisterBackgroundSignalScheduler();
}
Expand Down
12 changes: 3 additions & 9 deletions src/bench/checkqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#include <checkqueue.h>
#include <prevector.h>
#include <vector>
#include <boost/thread/thread.hpp>
#include <random.h>


Expand Down Expand Up @@ -36,11 +35,8 @@ static void CCheckQueueSpeedPrevectorJob(benchmark::State& state)
}
void swap(PrevectorJob& x){p.swap(x.p);};
};
CCheckQueue<PrevectorJob> queue {QUEUE_BATCH_SIZE};
boost::thread_group tg;
for (auto x = 0; x < std::max(MIN_CORES, GetNumCores()); ++x) {
tg.create_thread([&]{queue.Thread();});
}
int num_threads= GetNumCores() - 1;
CCheckQueue<PrevectorJob> queue {QUEUE_BATCH_SIZE, num_threads};
azuchi marked this conversation as resolved.
Show resolved Hide resolved
while (state.KeepRunning()) {
// Make insecure_rand here so that each iteration is identical.
FastRandomContext insecure_rand(true);
Expand All @@ -50,13 +46,11 @@ static void CCheckQueueSpeedPrevectorJob(benchmark::State& state)
vChecks.reserve(BATCH_SIZE);
for (size_t x = 0; x < BATCH_SIZE; ++x)
vChecks.emplace_back(insecure_rand);
control.Add(vChecks);
control.Add(std::move(vChecks));
}
// control waits for completion by RAII, but
// it is done explicitly here for clarity
control.Wait();
}
tg.interrupt_all();
tg.join_all();
}
BENCHMARK(CCheckQueueSpeedPrevectorJob, 1400);
85 changes: 56 additions & 29 deletions src/checkqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@

#include <algorithm>
#include <vector>
#include <logging.h>
azuchi marked this conversation as resolved.
Show resolved Hide resolved

#include <boost/thread/condition_variable.hpp>
#include <boost/thread/mutex.hpp>

template <typename T>
class CCheckQueueControl;
Expand All @@ -31,13 +30,13 @@ class CCheckQueue
{
private:
//! Mutex to protect the inner state
boost::mutex mutex;
std::mutex mutex;

//! Worker threads block on this when out of work
boost::condition_variable condWorker;
std::condition_variable condWorker;

//! Master thread blocks on this when out of work
boost::condition_variable condMaster;
std::condition_variable condMaster;

//! The queue of elements to be processed.
//! As the order of booleans doesn't matter, it is used as a LIFO (stack)
Expand All @@ -62,17 +61,20 @@ class CCheckQueue
//! The maximum number of elements to be processed in one batch
unsigned int nBatchSize;

std::vector<std::thread> m_worker_threads;
bool m_request_stop;

/** Internal function that does bulk of the verification work. */
bool Loop(bool fMaster = false)
{
boost::condition_variable& cond = fMaster ? condMaster : condWorker;
std::condition_variable& cond = fMaster ? condMaster : condWorker;
std::vector<T> vChecks;
vChecks.reserve(nBatchSize);
unsigned int nNow = 0;
bool fOk = true;
do {
{
boost::unique_lock<boost::mutex> lock(mutex);
WaitableLock lock(mutex);
// first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
if (nNow) {
fAllOk &= fOk;
Expand All @@ -85,7 +87,7 @@ class CCheckQueue
nTotal++;
}
// logically, the do loop starts here
while (queue.empty()) {
while (queue.empty() && !m_request_stop) {
if (fMaster && nTodo == 0) {
nTotal--;
bool fRet = fAllOk;
Expand All @@ -99,19 +101,18 @@ class CCheckQueue
cond.wait(lock); // wait
nIdle--;
}
if (m_request_stop) {
return false;
}
// Decide how many work units to process now.
// * Do not try to do everything at once, but aim for increasingly smaller batches so
// all workers finish approximately simultaneously.
// * Try to account for idle jobs which will instantly start helping.
// * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
vChecks.resize(nNow);
for (unsigned int i = 0; i < nNow; i++) {
// We want the lock on the mutex to be as short as possible, so swap jobs from the global
// queue to the local batch vector instead of copying.
vChecks[i].swap(queue.back());
queue.pop_back();
}
auto start_it = queue.end() - nNow;
vChecks.assign(std::make_move_iterator(start_it), std::make_move_iterator(queue.end()));
queue.erase(start_it, queue.end());
// Check whether we need to do work at all
fOk = fAllOk;
}
Expand All @@ -125,40 +126,66 @@ class CCheckQueue

public:
//! Mutex to ensure only one concurrent CCheckQueueControl
boost::mutex ControlMutex;
std::mutex ControlMutex;

//! Create a new check queue
explicit CCheckQueue(unsigned int nBatchSizeIn) : nIdle(0), nTotal(0), fAllOk(true), nTodo(0), nBatchSize(nBatchSizeIn) {}

//! Worker thread
void Thread()
explicit CCheckQueue(unsigned int batch_size, int worker_threads_num) : nIdle(0), nTotal(0), fAllOk(true), nTodo(0), nBatchSize(batch_size), m_request_stop(false)
{
Loop();
{
WaitableLock loc(mutex);
nIdle = 0;
nTotal = 0;
fAllOk = true;
}
assert(m_worker_threads.empty());
for (int n = 0; n < worker_threads_num; ++n) {
m_worker_threads.emplace_back([this, n]() {
RenameThread(strprintf("scriptch.%i", n));
Loop(false /* worker thread */);
});
}
}

// Since this class manages its own resources, which is a thread
// pool `m_worker_threads`, copy and move operations are not appropriate.
CCheckQueue(const CCheckQueue&) = delete;
CCheckQueue& operator=(const CCheckQueue&) = delete;
CCheckQueue(CCheckQueue&&) = delete;
CCheckQueue& operator=(CCheckQueue&&) = delete;

//! Wait until execution finishes, and return whether all evaluations were successful.
bool Wait()
{
return Loop(true);
}

//! Add a batch of checks to the queue
void Add(std::vector<T>& vChecks)
void Add(std::vector<T>&& vChecks)
{
boost::unique_lock<boost::mutex> lock(mutex);
for (T& check : vChecks) {
queue.push_back(T());
check.swap(queue.back());
if (vChecks.empty()) {
return;
}

{
WaitableLock lock(mutex);
queue.insert(queue.end(), std::make_move_iterator(vChecks.begin()), std::make_move_iterator(vChecks.end()));
nTodo += vChecks.size();
}
nTodo += vChecks.size();

if (vChecks.size() == 1)
condWorker.notify_one();
else if (vChecks.size() > 1)
condWorker.notify_all();
}

//! Stop all of the worker threads.
~CCheckQueue()
{
m_request_stop = true;
condWorker.notify_all();
for (std::thread& t : m_worker_threads) {
t.join();
}
}

};
Expand Down Expand Up @@ -195,10 +222,10 @@ class CCheckQueueControl
return fRet;
}

void Add(std::vector<T>& vChecks)
void Add(std::vector<T>&& vChecks)
{
if (pqueue != nullptr)
pqueue->Add(vChecks);
pqueue->Add(std::move(vChecks));
}

~CCheckQueueControl()
Expand Down
2 changes: 1 addition & 1 deletion src/index/base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ void BaseIndex::Start()
return;
}

m_thread_sync = std::thread(&TraceThread<std::function<void()>>, GetName(),
m_thread_sync = std::thread(&TraceThread, GetName(),
std::bind(&BaseIndex::ThreadSync, this));
}

Expand Down
1 change: 0 additions & 1 deletion src/index/txindex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ bool TxIndex::DB::MigrateData(CBlockTreeDB& block_tree_db, const CBlockLocator&
bool interrupted = false;
std::unique_ptr<CDBIterator> cursor(block_tree_db.NewIterator());
for (cursor->Seek(begin_key); cursor->Valid(); cursor->Next()) {
boost::this_thread::interruption_point();
if (ShutdownRequested()) {
interrupted = true;
break;
Expand Down
18 changes: 8 additions & 10 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@
#include <boost/algorithm/string/replace.hpp>
#include <boost/algorithm/string/split.hpp>
#include <boost/interprocess/sync/file_lock.hpp>
#include <boost/thread.hpp>

#include <thread>

#if ENABLE_ZMQ
#include <zmq/zmqnotificationinterface.h>
Expand Down Expand Up @@ -166,7 +167,6 @@ class CCoinsViewErrorCatcher final : public CCoinsViewBacked
static std::unique_ptr<CCoinsViewErrorCatcher> pcoinscatcher;
static std::unique_ptr<ECCVerifyHandle> globalVerifyHandle;

static boost::thread_group threadGroup;
static CScheduler scheduler;

void Interrupt()
Expand Down Expand Up @@ -214,10 +214,9 @@ void Shutdown()

StopTorControl();

// After everything has been shut down, but before things get flushed, stop the
// CScheduler/checkqueue threadGroup
threadGroup.interrupt_all();
threadGroup.join_all();
// After everything has been shut down, but before things get flushed,
//stop scheduler and load block threads.
scheduler.stop();

// After the threads that potentially access these pointers have been stopped,
// destruct and reset all to nullptr.
Expand Down Expand Up @@ -1230,13 +1229,12 @@ bool AppInitMain()

LogPrintf("Using %u threads for script verification\n", nScriptCheckThreads);
if (nScriptCheckThreads) {
for (int i=0; i<nScriptCheckThreads-1; i++)
threadGroup.create_thread(&ThreadScriptCheck);
StartScriptCheckWorkerThreads(nScriptCheckThreads);
}

// Start the lightweight task scheduler thread
CScheduler::Function serviceLoop = std::bind(&CScheduler::serviceQueue, &scheduler);
threadGroup.create_thread(std::bind(&TraceThread<CScheduler::Function>, "scheduler", serviceLoop));
scheduler.m_service_thread = std::thread(&TraceThread, "scheduler", serviceLoop);

// Gather some entropy once per minute.
scheduler.scheduleEvery([]{
Expand Down Expand Up @@ -1634,7 +1632,7 @@ bool AppInitMain()
vImportFiles.push_back(strFile);
}

threadGroup.create_thread(std::bind(&ThreadImport, vImportFiles, fReloadxfield));
scheduler.m_load_block = std::thread(std::bind(&ThreadImport, vImportFiles, fReloadxfield));

// Wait for genesis block to be processed
{
Expand Down
1 change: 0 additions & 1 deletion src/interfaces/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
#endif

#include <atomic>
#include <boost/thread/thread.hpp>
#include <univalue.h>

namespace interfaces {
Expand Down
12 changes: 6 additions & 6 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1564,7 +1564,7 @@ void StartMapPort()
{
if (!g_upnp_thread.joinable()) {
assert(!g_upnp_interrupt);
g_upnp_thread = std::thread((std::bind(&TraceThread<void (*)()>, "upnp", &ThreadMapPort)));
g_upnp_thread = std::thread((std::bind(&TraceThread, "upnp", &ThreadMapPort)));
}
}

Expand Down Expand Up @@ -2354,15 +2354,15 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions)
}

// Send and receive from sockets, accept connections
threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this)));
threadSocketHandler = std::thread(&TraceThread, "net", std::bind(&CConnman::ThreadSocketHandler, this));

if (!gArgs.GetBoolArg("-dnsseed", true))
LogPrintf("DNS seeding disabled\n");
else
threadDNSAddressSeed = std::thread(&TraceThread<std::function<void()> >, "dnsseed", std::function<void()>(std::bind(&CConnman::ThreadDNSAddressSeed, this)));
threadDNSAddressSeed = std::thread(&TraceThread, "dnsseed", std::bind(&CConnman::ThreadDNSAddressSeed, this));

// Initiate outbound connections from -addnode
threadOpenAddedConnections = std::thread(&TraceThread<std::function<void()> >, "addcon", std::function<void()>(std::bind(&CConnman::ThreadOpenAddedConnections, this)));
threadOpenAddedConnections = std::thread(&TraceThread, "addcon", std::bind(&CConnman::ThreadOpenAddedConnections, this));

if (connOptions.m_use_addrman_outgoing && !connOptions.m_specified_outgoing.empty()) {
if (clientInterface) {
Expand All @@ -2373,10 +2373,10 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions)
return false;
}
if (connOptions.m_use_addrman_outgoing || !connOptions.m_specified_outgoing.empty())
threadOpenConnections = std::thread(&TraceThread<std::function<void()> >, "opencon", std::function<void()>(std::bind(&CConnman::ThreadOpenConnections, this, connOptions.m_specified_outgoing)));
threadOpenConnections = std::thread(&TraceThread, "opencon", std::bind(&CConnman::ThreadOpenConnections, this, connOptions.m_specified_outgoing));

// Process messages
threadMessageHandler = std::thread(&TraceThread<std::function<void()> >, "msghand", std::function<void()>(std::bind(&CConnman::ThreadMessageHandler, this)));
threadMessageHandler = std::thread(&TraceThread, "msghand", std::bind(&CConnman::ThreadMessageHandler, this));

// Dump network addresses
scheduler.scheduleEvery(std::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL * 1000);
Expand Down
3 changes: 0 additions & 3 deletions src/rpc/blockchain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
#include <univalue.h>

#include <boost/algorithm/string.hpp>
#include <boost/thread/thread.hpp> // boost::thread::interrupt

#include <memory>
#include <mutex>
Expand Down Expand Up @@ -839,7 +838,6 @@ static bool GetUTXOStats(CCoinsView *view, CCoinsStats &stats)
uint256 prevkey;
std::map<uint32_t, Coin> outputs;
while (pcursor->Valid()) {
boost::this_thread::interruption_point();
COutPoint key;
Coin coin;
if (pcursor->GetKey(key) && pcursor->GetValue(coin)) {
Expand Down Expand Up @@ -1832,7 +1830,6 @@ bool FindScriptPubKey(std::atomic<int>& scan_progress, const std::atomic<bool>&
Coin coin;
if (!cursor->GetKey(key) || !cursor->GetValue(coin)) return false;
if (++count % 8192 == 0) {
boost::this_thread::interruption_point();
if (should_abort) {
// allow to abort the scan via the abort reference
return false;
Expand Down
Loading