Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove boost threads #269

Merged
merged 20 commits into from
Jul 5, 2023
Merged
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
f242967
remove boost::chrono
Naviabheeman Jun 1, 2023
3efeff8
make script validation checkqueue threads use internal pool instead o…
Naviabheeman Jun 1, 2023
fb234b1
Revert "remove boost::chrono"
Naviabheeman Jun 1, 2023
e6bf363
remove boost::thread and use std::thread
Naviabheeman Jun 1, 2023
fe42797
add cmath to fix build
Naviabheeman Jun 2, 2023
d561f07
fix build - add missing headers, change boost::chrono to std chrono
Naviabheeman Jun 2, 2023
35826fd
remove static from all critical section code to fix linking error
Naviabheeman Jun 2, 2023
c8f5a22
remove CheckLastCritical from leave critical section
Naviabheeman Jun 2, 2023
a00ca8f
replace boost::interuption point with std:: yield
Naviabheeman Jun 2, 2023
ad8d47e
remove scheduling from bench assemble block
Naviabheeman Jun 3, 2023
1279321
fix hanging benchmark test
Naviabheeman Jun 3, 2023
cc7f9b1
fix test - stop scheduler threads separately.
Naviabheeman Jun 3, 2023
fa463dd
remove this_thread::yield
Naviabheeman Jun 19, 2023
1541464
review feedback - fix unused boost::thread interrupt and comments
Naviabheeman Jun 21, 2023
f0288d1
fix CI failure in CheckQueueControl. Replace c style interface with R…
Naviabheeman Jun 22, 2023
c57eb00
change scheduler thread in test_tapyrus to use the same syntax as ta…
Naviabheeman Jun 22, 2023
ff3b571
fix initialization of scheduler service thread
Naviabheeman Jun 22, 2023
1c15ee0
port commits 0682003 to 95ad70a from bitcoin to fix test hanging
Naviabheeman Jul 3, 2023
7dfc615
ifixing build - use move operations in all vchecks
Naviabheeman Jul 3, 2023
6e3dbff
review feedback - remove unused header
Naviabheeman Jul 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
make script validation checkqueue threads use internal pool instead o…
…f global scheduler pool bitcoin#15632
  • Loading branch information
Naviabheeman committed Jun 20, 2023
commit 3efeff868494235d4e95c7676a31cfe20a8c7ddb
9 changes: 2 additions & 7 deletions src/bench/checkqueue.cpp
Original file line number Diff line number Diff line change
@@ -8,7 +8,6 @@
#include <checkqueue.h>
#include <prevector.h>
#include <vector>
#include <boost/thread/thread.hpp>
#include <random.h>


@@ -37,10 +36,7 @@ static void CCheckQueueSpeedPrevectorJob(benchmark::State& state)
void swap(PrevectorJob& x){p.swap(x.p);};
};
CCheckQueue<PrevectorJob> queue {QUEUE_BATCH_SIZE};
boost::thread_group tg;
for (auto x = 0; x < std::max(MIN_CORES, GetNumCores()); ++x) {
tg.create_thread([&]{queue.Thread();});
}
queue.StartWorkerThreads(GetNumCores() - 1);
while (state.KeepRunning()) {
// Make insecure_rand here so that each iteration is identical.
FastRandomContext insecure_rand(true);
@@ -56,7 +52,6 @@ static void CCheckQueueSpeedPrevectorJob(benchmark::State& state)
// it is done explicitly here for clarity
control.Wait();
}
tg.interrupt_all();
tg.join_all();
queue.StopWorkerThreads();
}
BENCHMARK(CCheckQueueSpeedPrevectorJob, 1400);
69 changes: 52 additions & 17 deletions src/checkqueue.h
Original file line number Diff line number Diff line change
@@ -10,8 +10,6 @@
#include <algorithm>
#include <vector>

#include <boost/thread/condition_variable.hpp>
#include <boost/thread/mutex.hpp>

template <typename T>
class CCheckQueueControl;
@@ -31,48 +29,51 @@ class CCheckQueue
{
private:
//! Mutex to protect the inner state
boost::mutex mutex;
std::mutex mutex;

//! Worker threads block on this when out of work
boost::condition_variable condWorker;
std::condition_variable condWorker;

//! Master thread blocks on this when out of work
boost::condition_variable condMaster;
std::condition_variable condMaster;

//! The queue of elements to be processed.
//! As the order of booleans doesn't matter, it is used as a LIFO (stack)
std::vector<T> queue;
std::vector<T> queue GUARDED_BY(mutex);

//! The number of workers (including the master) that are idle.
int nIdle;
int nIdle GUARDED_BY(mutex){0};

//! The total number of workers (including the master).
int nTotal;
int nTotal GUARDED_BY(mutex){0};

//! The temporary evaluation result.
bool fAllOk;
bool fAllOk GUARDED_BY(mutex){true};

/**
* Number of verifications that haven't completed yet.
* This includes elements that are no longer queued, but still in the
* worker's own batches.
*/
unsigned int nTodo;
unsigned int nTodo GUARDED_BY(mutex){0};

//! The maximum number of elements to be processed in one batch
unsigned int nBatchSize;

std::vector<std::thread> m_worker_threads;
bool m_request_stop GUARDED_BY(mutex){false};

/** Internal function that does bulk of the verification work. */
bool Loop(bool fMaster = false)
{
boost::condition_variable& cond = fMaster ? condMaster : condWorker;
std::condition_variable& cond = fMaster ? condMaster : condWorker;
std::vector<T> vChecks;
vChecks.reserve(nBatchSize);
unsigned int nNow = 0;
bool fOk = true;
do {
{
boost::unique_lock<boost::mutex> lock(mutex);
WaitableLock lock(mutex);
// first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
if (nNow) {
fAllOk &= fOk;
@@ -85,7 +86,7 @@ class CCheckQueue
nTotal++;
}
// logically, the do loop starts here
while (queue.empty()) {
while (queue.empty() && !m_request_stop) {
if (fMaster && nTodo == 0) {
nTotal--;
bool fRet = fAllOk;
@@ -99,6 +100,9 @@ class CCheckQueue
cond.wait(lock); // wait
nIdle--;
}
if (m_request_stop) {
return false;
}
// Decide how many work units to process now.
// * Do not try to do everything at once, but aim for increasingly smaller batches so
// all workers finish approximately simultaneously.
@@ -130,10 +134,22 @@ class CCheckQueue
//! Create a new check queue
explicit CCheckQueue(unsigned int nBatchSizeIn) : nIdle(0), nTotal(0), fAllOk(true), nTodo(0), nBatchSize(nBatchSizeIn) {}

//! Worker thread
void Thread()
//! Create a pool of new worker threads.
void StartWorkerThreads(const int threads_num)
{
Loop();
{
WaitableLock loc(mutex);
nIdle = 0;
nTotal = 0;
fAllOk = true;
}
assert(m_worker_threads.empty());
for (int n = 0; n < threads_num; ++n) {
m_worker_threads.emplace_back([this, n]() {
RenameThread(strprintf("scriptch.%i", n).c_str());
Loop(false /* worker thread */);
});
}
}

//! Wait until execution finishes, and return whether all evaluations were successful.
@@ -145,7 +161,7 @@ class CCheckQueue
//! Add a batch of checks to the queue
void Add(std::vector<T>& vChecks)
{
boost::unique_lock<boost::mutex> lock(mutex);
WaitableLock lock(mutex);
for (T& check : vChecks) {
queue.push_back(T());
check.swap(queue.back());
@@ -157,8 +173,27 @@ class CCheckQueue
condWorker.notify_all();
}

//! Stop all of the worker threads.
void StopWorkerThreads()
{
{
WaitableLock lock(mutex);
m_request_stop = true;
}
condWorker.notify_all();
for (std::thread& t : m_worker_threads) {
t.join();
}
m_worker_threads.clear();
{
WaitableLock lock(mutex);
m_request_stop = false;
}
}

~CCheckQueue()
{
assert(m_worker_threads.empty());
}

};
4 changes: 2 additions & 2 deletions src/init.cpp
Original file line number Diff line number Diff line change
@@ -218,6 +218,7 @@ void Shutdown()
// CScheduler/checkqueue threadGroup
threadGroup.interrupt_all();
threadGroup.join_all();
StopScriptCheckWorkerThreads();

// After the threads that potentially access these pointers have been stopped,
// destruct and reset all to nullptr.
@@ -1230,8 +1231,7 @@ bool AppInitMain()

LogPrintf("Using %u threads for script verification\n", nScriptCheckThreads);
if (nScriptCheckThreads) {
for (int i=0; i<nScriptCheckThreads-1; i++)
threadGroup.create_thread(&ThreadScriptCheck);
StartScriptCheckWorkerThreads(nScriptCheckThreads);
}

// Start the lightweight task scheduler thread
87 changes: 33 additions & 54 deletions src/test/checkqueue_tests.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
// Copyright (c) 2012-2018 The Bitcoin Core developers
// Copyright (c) 2019 Chaintope Inc.
// Copyright (c) 2012-2020 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

@@ -10,7 +9,8 @@
#include <test/test_tapyrus.h>
#include <checkqueue.h>
#include <boost/test/unit_test.hpp>
#include <boost/thread.hpp>
#include <boost/thread/thread.hpp>

#include <atomic>
#include <thread>
#include <vector>
@@ -26,6 +26,7 @@
BOOST_FIXTURE_TEST_SUITE(checkqueue_tests, TestingSetup)

static const unsigned int QUEUE_BATCH_SIZE = 128;
static const int SCRIPT_CHECK_THREADS = 3;

struct FakeCheck {
bool operator()()
@@ -149,11 +150,8 @@ typedef CCheckQueue<FrozenCleanupCheck> FrozenCleanup_Queue;
*/
static void Correct_Queue_range(std::vector<size_t> range)
{
auto small_queue = std::unique_ptr<Correct_Queue>(new Correct_Queue {QUEUE_BATCH_SIZE});
boost::thread_group tg;
for (auto x = 0; x < nScriptCheckThreads; ++x) {
tg.create_thread([&]{small_queue->Thread();});
}
auto small_queue = std::unique_ptr<Correct_Queue>(new Correct_Queue{QUEUE_BATCH_SIZE});
small_queue->StartWorkerThreads(SCRIPT_CHECK_THREADS);
// Make vChecks here to save on malloc (this test can be slow...)
std::vector<FakeCheckCheckCompletion> vChecks;
for (auto i : range) {
@@ -168,11 +166,9 @@ static void Correct_Queue_range(std::vector<size_t> range)
BOOST_REQUIRE(control.Wait());
if (FakeCheckCheckCompletion::n_calls != i) {
BOOST_REQUIRE_EQUAL(FakeCheckCheckCompletion::n_calls, i);
BOOST_TEST_MESSAGE("Failure on trial " << i << " expected, got " << FakeCheckCheckCompletion::n_calls);
}
}
tg.interrupt_all();
tg.join_all();
small_queue->StopWorkerThreads();
}

/** Test that 0 checks is correct
@@ -214,12 +210,8 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Correct_Random)
/** Test that failing checks are caught */
BOOST_AUTO_TEST_CASE(test_CheckQueue_Catches_Failure)
{
auto fail_queue = std::unique_ptr<Failing_Queue>(new Failing_Queue {QUEUE_BATCH_SIZE});

boost::thread_group tg;
for (auto x = 0; x < nScriptCheckThreads; ++x) {
tg.create_thread([&]{fail_queue->Thread();});
}
auto fail_queue = std::unique_ptr<Failing_Queue>(new Failing_Queue{QUEUE_BATCH_SIZE});
fail_queue->StartWorkerThreads(SCRIPT_CHECK_THREADS);

for (size_t i = 0; i < 1001; ++i) {
CCheckQueueControl<FailingCheck> control(fail_queue.get());
@@ -240,18 +232,14 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Catches_Failure)
BOOST_REQUIRE(success);
}
}
tg.interrupt_all();
tg.join_all();
fail_queue->StopWorkerThreads();
}
// Test that a block validation which fails does not interfere with
// future blocks, ie, the bad state is cleared.
BOOST_AUTO_TEST_CASE(test_CheckQueue_Recovers_From_Failure)
{
auto fail_queue = std::unique_ptr<Failing_Queue>(new Failing_Queue {QUEUE_BATCH_SIZE});
boost::thread_group tg;
for (auto x = 0; x < nScriptCheckThreads; ++x) {
tg.create_thread([&]{fail_queue->Thread();});
}
auto fail_queue = std::unique_ptr<Failing_Queue>(new Failing_Queue{QUEUE_BATCH_SIZE});
fail_queue->StartWorkerThreads(SCRIPT_CHECK_THREADS);

for (auto times = 0; times < 10; ++times) {
for (bool end_fails : {true, false}) {
@@ -266,21 +254,16 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Recovers_From_Failure)
BOOST_REQUIRE(r != end_fails);
}
}
tg.interrupt_all();
tg.join_all();
fail_queue->StopWorkerThreads();
}

// Test that unique checks are actually all called individually, rather than
// just one check being called repeatedly. Test that checks are not called
// more than once as well
BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck)
{
auto queue = std::unique_ptr<Unique_Queue>(new Unique_Queue {QUEUE_BATCH_SIZE});
boost::thread_group tg;
for (auto x = 0; x < nScriptCheckThreads; ++x) {
tg.create_thread([&]{queue->Thread();});

}
auto queue = std::unique_ptr<Unique_Queue>(new Unique_Queue{QUEUE_BATCH_SIZE});
queue->StartWorkerThreads(SCRIPT_CHECK_THREADS);

size_t COUNT = 100000;
size_t total = COUNT;
@@ -294,13 +277,16 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck)
control.Add(vChecks);
}
}
bool r = true;
BOOST_REQUIRE_EQUAL(UniqueCheck::results.size(), COUNT);
for (size_t i = 0; i < COUNT; ++i)
r = r && UniqueCheck::results.count(i) == 1;
BOOST_REQUIRE(r);
tg.interrupt_all();
tg.join_all();
{
WaitableLock lock(UniqueCheck::m);
bool r = true;
BOOST_REQUIRE_EQUAL(UniqueCheck::results.size(), COUNT);
for (size_t i = 0; i < COUNT; ++i) {
r = r && UniqueCheck::results.count(i) == 1;
}
BOOST_REQUIRE(r);
}
queue->StopWorkerThreads();
}


@@ -311,11 +297,8 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck)
// time could leave the data hanging across a sequence of blocks.
BOOST_AUTO_TEST_CASE(test_CheckQueue_Memory)
{
auto queue = std::unique_ptr<Memory_Queue>(new Memory_Queue {QUEUE_BATCH_SIZE});
boost::thread_group tg;
for (auto x = 0; x < nScriptCheckThreads; ++x) {
tg.create_thread([&]{queue->Thread();});
}
auto queue = std::unique_ptr<Memory_Queue>(new Memory_Queue{QUEUE_BATCH_SIZE});
queue->StartWorkerThreads(SCRIPT_CHECK_THREADS);
for (size_t i = 0; i < 1000; ++i) {
size_t total = i;
{
@@ -334,20 +317,16 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Memory)
}
BOOST_REQUIRE_EQUAL(MemoryCheck::fake_allocated_memory, 0U);
}
tg.interrupt_all();
tg.join_all();
queue->StopWorkerThreads();
}

// Test that a new verification cannot occur until all checks
// have been destructed
BOOST_AUTO_TEST_CASE(test_CheckQueue_FrozenCleanup)
{
auto queue = std::unique_ptr<FrozenCleanup_Queue>(new FrozenCleanup_Queue {QUEUE_BATCH_SIZE});
boost::thread_group tg;
auto queue = std::unique_ptr<FrozenCleanup_Queue>(new FrozenCleanup_Queue{QUEUE_BATCH_SIZE});
bool fails = false;
for (auto x = 0; x < nScriptCheckThreads; ++x) {
tg.create_thread([&]{queue->Thread();});
}
queue->StartWorkerThreads(SCRIPT_CHECK_THREADS);
std::thread t0([&]() {
CCheckQueueControl<FrozenCleanupCheck> control(queue.get());
std::vector<FrozenCleanupCheck> vChecks(1);
@@ -356,7 +335,8 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_FrozenCleanup)
// would get called twice).
vChecks[0].should_freeze = true;
control.Add(vChecks);
control.Wait(); // Hangs here
bool waitResult = control.Wait(); // Hangs here
assert(waitResult);
});
{
std::unique_lock<std::mutex> l(FrozenCleanupCheck::m);
@@ -376,9 +356,8 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_FrozenCleanup)
FrozenCleanupCheck::cv.notify_one();
// Wait for control to finish
t0.join();
tg.interrupt_all();
tg.join_all();
BOOST_REQUIRE(!fails);
queue->StopWorkerThreads();
}


Loading