Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove boost threads #269

Merged
merged 20 commits into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
f242967
remove boost::chrono
Naviabheeman Jun 1, 2023
3efeff8
make script validation checkqueue threads use internal pool instead o…
Naviabheeman Jun 1, 2023
fb234b1
Revert "remove boost::chrono"
Naviabheeman Jun 1, 2023
e6bf363
remove boost::thread and use std::thread
Naviabheeman Jun 1, 2023
fe42797
add cmath to fix build
Naviabheeman Jun 2, 2023
d561f07
fix build - add missing headers, change boost::chrono to std chrono
Naviabheeman Jun 2, 2023
35826fd
remove static from all critical section code to fix linking error
Naviabheeman Jun 2, 2023
c8f5a22
remove CheckLastCritical from leave critical section
Naviabheeman Jun 2, 2023
a00ca8f
replace boost::interuption point with std:: yield
Naviabheeman Jun 2, 2023
ad8d47e
remove scheduling from bench assemble block
Naviabheeman Jun 3, 2023
1279321
fix hanging benchmark test
Naviabheeman Jun 3, 2023
cc7f9b1
fix test - stop scheduler threads separately.
Naviabheeman Jun 3, 2023
fa463dd
remove this_thread::yield
Naviabheeman Jun 19, 2023
1541464
review feedback - fix unused boost::thread interrupt and comments
Naviabheeman Jun 21, 2023
f0288d1
fix CI failure in CheckQueueControl. Replace c style interface with R…
Naviabheeman Jun 22, 2023
c57eb00
change scheduler thread in test_tapyrus to use the same syntax as ta…
Naviabheeman Jun 22, 2023
ff3b571
fix initialization of scheduler service thread
Naviabheeman Jun 22, 2023
1c15ee0
port commits 0682003 to 95ad70a from bitcoin to fix test hanging
Naviabheeman Jul 3, 2023
7dfc615
ifixing build - use move operations in all vchecks
Naviabheeman Jul 3, 2023
6e3dbff
review feedback - remove unused header
Naviabheeman Jul 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/addrman.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <hash.h>
#include <serialize.h>
#include <streams.h>
#include <cmath>

int CAddrInfo::GetTriedBucket(const uint256& nKey) const
{
Expand Down Expand Up @@ -60,7 +61,7 @@ double CAddrInfo::GetChance(int64_t nNow) const
fChance *= 0.01;

// deprioritize 66% after each failed attempt, but at most 1/28th to avoid the search taking forever or overly penalizing outages.
fChance *= pow(0.66, std::min(nAttempts, 8));
fChance *= std::pow(0.66, std::min(nAttempts, 8));
azuchi marked this conversation as resolved.
Show resolved Hide resolved

return fChance;
}
Expand Down
9 changes: 5 additions & 4 deletions src/bench/block_assemble.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ static void AssembleBlock(benchmark::State& state)
InitSignatureCache();
InitScriptExecutionCache();

boost::thread_group thread_group;
CScheduler scheduler;
std::thread thread(std::bind(&CScheduler::serviceQueue, &scheduler));
{
::pblocktree.reset(new CBlockTreeDB(1 << 20, true));
::pcoinsdbview.reset(new CCoinsViewDB(1 << 23, true));
::pcoinsTip.reset(new CCoinsViewCache(pcoinsdbview.get()));

thread_group.create_thread(std::bind(&CScheduler::serviceQueue, &scheduler));
GetMainSignals().RegisterBackgroundSignalScheduler(scheduler);

LoadGenesisBlock();
CValidationState state;
ActivateBestChain(state);
Expand Down Expand Up @@ -130,8 +130,9 @@ static void AssembleBlock(benchmark::State& state)
PrepareBlock(SCRIPT_PUB);
}

thread_group.interrupt_all();
thread_group.join_all();
scheduler.stop();
if(thread.joinable())
thread.join();
GetMainSignals().FlushBackgroundCallbacks();
GetMainSignals().UnregisterBackgroundSignalScheduler();
}
Expand Down
9 changes: 2 additions & 7 deletions src/bench/checkqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#include <checkqueue.h>
#include <prevector.h>
#include <vector>
#include <boost/thread/thread.hpp>
#include <random.h>


Expand Down Expand Up @@ -37,10 +36,7 @@ static void CCheckQueueSpeedPrevectorJob(benchmark::State& state)
void swap(PrevectorJob& x){p.swap(x.p);};
};
CCheckQueue<PrevectorJob> queue {QUEUE_BATCH_SIZE};
boost::thread_group tg;
for (auto x = 0; x < std::max(MIN_CORES, GetNumCores()); ++x) {
tg.create_thread([&]{queue.Thread();});
}
queue.StartWorkerThreads(GetNumCores() - 1);
while (state.KeepRunning()) {
// Make insecure_rand here so that each iteration is identical.
FastRandomContext insecure_rand(true);
Expand All @@ -56,7 +52,6 @@ static void CCheckQueueSpeedPrevectorJob(benchmark::State& state)
// it is done explicitly here for clarity
control.Wait();
}
tg.interrupt_all();
tg.join_all();
queue.StopWorkerThreads();
}
BENCHMARK(CCheckQueueSpeedPrevectorJob, 1400);
61 changes: 48 additions & 13 deletions src/checkqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
#include <algorithm>
#include <vector>

#include <boost/thread/condition_variable.hpp>
#include <boost/thread/mutex.hpp>

template <typename T>
class CCheckQueueControl;
Expand All @@ -31,13 +29,13 @@ class CCheckQueue
{
private:
//! Mutex to protect the inner state
boost::mutex mutex;
std::mutex mutex;

//! Worker threads block on this when out of work
boost::condition_variable condWorker;
std::condition_variable condWorker;

//! Master thread blocks on this when out of work
boost::condition_variable condMaster;
std::condition_variable condMaster;

//! The queue of elements to be processed.
//! As the order of booleans doesn't matter, it is used as a LIFO (stack)
Expand All @@ -62,17 +60,20 @@ class CCheckQueue
//! The maximum number of elements to be processed in one batch
unsigned int nBatchSize;

std::vector<std::thread> m_worker_threads;
bool m_request_stop;

/** Internal function that does bulk of the verification work. */
bool Loop(bool fMaster = false)
{
boost::condition_variable& cond = fMaster ? condMaster : condWorker;
std::condition_variable& cond = fMaster ? condMaster : condWorker;
std::vector<T> vChecks;
vChecks.reserve(nBatchSize);
unsigned int nNow = 0;
bool fOk = true;
do {
{
boost::unique_lock<boost::mutex> lock(mutex);
WaitableLock lock(mutex);
// first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
if (nNow) {
fAllOk &= fOk;
Expand All @@ -85,7 +86,7 @@ class CCheckQueue
nTotal++;
}
// logically, the do loop starts here
while (queue.empty()) {
while (queue.empty() && !m_request_stop) {
if (fMaster && nTodo == 0) {
nTotal--;
bool fRet = fAllOk;
Expand All @@ -99,6 +100,9 @@ class CCheckQueue
cond.wait(lock); // wait
nIdle--;
}
if (m_request_stop) {
return false;
}
// Decide how many work units to process now.
// * Do not try to do everything at once, but aim for increasingly smaller batches so
// all workers finish approximately simultaneously.
Expand All @@ -125,15 +129,27 @@ class CCheckQueue

public:
//! Mutex to ensure only one concurrent CCheckQueueControl
boost::mutex ControlMutex;
std::mutex ControlMutex;

//! Create a new check queue
explicit CCheckQueue(unsigned int nBatchSizeIn) : nIdle(0), nTotal(0), fAllOk(true), nTodo(0), nBatchSize(nBatchSizeIn) {}

//! Worker thread
void Thread()
//! Create a pool of new worker threads.
azuchi marked this conversation as resolved.
Show resolved Hide resolved
void StartWorkerThreads(const int threads_num)
{
Loop();
{
WaitableLock loc(mutex);
nIdle = 0;
nTotal = 0;
fAllOk = true;
}
assert(m_worker_threads.empty());
for (int n = 0; n < threads_num; ++n) {
m_worker_threads.emplace_back([this, n]() {
RenameThread(strprintf("scriptch.%i", n).c_str());
Loop(false /* worker thread */);
});
}
}

//! Wait until execution finishes, and return whether all evaluations were successful.
Expand All @@ -145,7 +161,7 @@ class CCheckQueue
//! Add a batch of checks to the queue
void Add(std::vector<T>& vChecks)
{
boost::unique_lock<boost::mutex> lock(mutex);
WaitableLock lock(mutex);
for (T& check : vChecks) {
queue.push_back(T());
check.swap(queue.back());
Expand All @@ -157,8 +173,27 @@ class CCheckQueue
condWorker.notify_all();
}

//! Stop all of the worker threads.
azuchi marked this conversation as resolved.
Show resolved Hide resolved
void StopWorkerThreads()
{
{
WaitableLock lock(mutex);
m_request_stop = true;
}
condWorker.notify_all();
for (std::thread& t : m_worker_threads) {
t.join();
}
m_worker_threads.clear();
{
WaitableLock lock(mutex);
m_request_stop = false;
}
}

~CCheckQueue()
{
assert(m_worker_threads.empty());
}

};
Expand Down
1 change: 0 additions & 1 deletion src/index/txindex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ bool TxIndex::DB::MigrateData(CBlockTreeDB& block_tree_db, const CBlockLocator&
bool interrupted = false;
std::unique_ptr<CDBIterator> cursor(block_tree_db.NewIterator());
for (cursor->Seek(begin_key); cursor->Valid(); cursor->Next()) {
boost::this_thread::interruption_point();
if (ShutdownRequested()) {
interrupted = true;
break;
Expand Down
21 changes: 11 additions & 10 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@
#include <boost/algorithm/string/replace.hpp>
#include <boost/algorithm/string/split.hpp>
#include <boost/interprocess/sync/file_lock.hpp>
#include <boost/thread.hpp>

#include <thread>

#if ENABLE_ZMQ
#include <zmq/zmqnotificationinterface.h>
Expand Down Expand Up @@ -166,7 +167,6 @@ class CCoinsViewErrorCatcher final : public CCoinsViewBacked
static std::unique_ptr<CCoinsViewErrorCatcher> pcoinscatcher;
static std::unique_ptr<ECCVerifyHandle> globalVerifyHandle;

static boost::thread_group threadGroup;
static CScheduler scheduler;

void Interrupt()
Expand Down Expand Up @@ -214,10 +214,12 @@ void Shutdown()

StopTorControl();

// After everything has been shut down, but before things get flushed, stop the
// CScheduler/checkqueue threadGroup
threadGroup.interrupt_all();
threadGroup.join_all();
// After everything has been shut down, but before things get flushed,
//1. stop the checkqueue threads.
StopScriptCheckWorkerThreads();

//2. stop scheduler and load block threads.
scheduler.stop();

// After the threads that potentially access these pointers have been stopped,
// destruct and reset all to nullptr.
Expand Down Expand Up @@ -1230,13 +1232,12 @@ bool AppInitMain()

LogPrintf("Using %u threads for script verification\n", nScriptCheckThreads);
if (nScriptCheckThreads) {
for (int i=0; i<nScriptCheckThreads-1; i++)
threadGroup.create_thread(&ThreadScriptCheck);
StartScriptCheckWorkerThreads(nScriptCheckThreads);
}

// Start the lightweight task scheduler thread
CScheduler::Function serviceLoop = std::bind(&CScheduler::serviceQueue, &scheduler);
threadGroup.create_thread(std::bind(&TraceThread<CScheduler::Function>, "scheduler", serviceLoop));
scheduler.m_service_thread = std::thread(TraceThread<CScheduler::Function>, "scheduler", serviceLoop);

// Gather some entropy once per minute.
scheduler.scheduleEvery([]{
Expand Down Expand Up @@ -1634,7 +1635,7 @@ bool AppInitMain()
vImportFiles.push_back(strFile);
}

threadGroup.create_thread(std::bind(&ThreadImport, vImportFiles, fReloadxfield));
scheduler.m_load_block = std::thread(std::bind(&ThreadImport, vImportFiles, fReloadxfield));

// Wait for genesis block to be processed
{
Expand Down
1 change: 0 additions & 1 deletion src/interfaces/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
#endif

#include <atomic>
#include <boost/thread/thread.hpp>
#include <univalue.h>

namespace interfaces {
Expand Down
3 changes: 0 additions & 3 deletions src/rpc/blockchain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
#include <univalue.h>

#include <boost/algorithm/string.hpp>
#include <boost/thread/thread.hpp> // boost::thread::interrupt

#include <memory>
#include <mutex>
Expand Down Expand Up @@ -839,7 +838,6 @@ static bool GetUTXOStats(CCoinsView *view, CCoinsStats &stats)
uint256 prevkey;
std::map<uint32_t, Coin> outputs;
while (pcursor->Valid()) {
boost::this_thread::interruption_point();
COutPoint key;
Coin coin;
if (pcursor->GetKey(key) && pcursor->GetValue(coin)) {
Expand Down Expand Up @@ -1832,7 +1830,6 @@ bool FindScriptPubKey(std::atomic<int>& scan_progress, const std::atomic<bool>&
Coin coin;
if (!cursor->GetKey(key) || !cursor->GetValue(coin)) return false;
if (++count % 8192 == 0) {
boost::this_thread::interruption_point();
if (should_abort) {
// allow to abort the scan via the abort reference
return false;
Expand Down
Loading