Skip to content

Commit

Permalink
fix initialization of scheduler service thread
Browse files Browse the repository at this point in the history
initialize m_request_stop variable in the constructor and handle  empty queue length separately
  • Loading branch information
Naviabheeman committed Jul 3, 2023
1 parent c57eb00 commit ff3b571
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 75 deletions.
27 changes: 15 additions & 12 deletions src/checkqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <algorithm>
#include <vector>
#include <logging.h>


template <typename T>
Expand Down Expand Up @@ -132,8 +133,7 @@ class CCheckQueue
std::mutex ControlMutex;

//! Create a new check queue
explicit CCheckQueue(unsigned int batch_size, int worker_threads_num)
: nBatchSize(batch_size)
explicit CCheckQueue(unsigned int batch_size, int worker_threads_num) : nIdle(0), nTotal(0), fAllOk(true), nTodo(0), nBatchSize(batch_size), m_request_stop(false)
{
{
WaitableLock loc(mutex);
Expand All @@ -144,7 +144,7 @@ class CCheckQueue
assert(m_worker_threads.empty());
for (int n = 0; n < worker_threads_num; ++n) {
m_worker_threads.emplace_back([this, n]() {
RenameThread(strprintf("scriptch.%i", n).c_str());
RenameThread(strprintf("scriptch.%i", n));
Loop(false /* worker thread */);
});
}
Expand All @@ -166,12 +166,18 @@ class CCheckQueue
//! Add a batch of checks to the queue
void Add(std::vector<T>& vChecks)
{
WaitableLock lock(mutex);
for (T& check : vChecks) {
queue.push_back(T());
check.swap(queue.back());
if (vChecks.empty()) {
return;
}
nTodo += vChecks.size();

{
WaitableLock lock(mutex);
for (T& check : vChecks) {
queue.push_back(T());
check.swap(queue.back());
}
}

if (vChecks.size() == 1)
condWorker.notify_one();
else if (vChecks.size() > 1)
Expand All @@ -181,10 +187,7 @@ class CCheckQueue
//! Stop all of the worker threads.
~CCheckQueue()
{
{
WaitableLock lock(mutex);
m_request_stop = true;
}
m_request_stop = true;
condWorker.notify_all();
for (std::thread& t : m_worker_threads) {
t.join();
Expand Down
2 changes: 1 addition & 1 deletion src/index/base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ void BaseIndex::Start()
return;
}

m_thread_sync = std::thread(&TraceThread<std::function<void()>>, GetName(),
m_thread_sync = std::thread(&TraceThread, GetName(),
std::bind(&BaseIndex::ThreadSync, this));
}

Expand Down
6 changes: 3 additions & 3 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,8 @@ void Shutdown()

StopTorControl();

// After everything has been shut down, but before things get flushed
//2. stop scheduler and load block threads.
// After everything has been shut down, but before things get flushed,
//stop scheduler and load block threads.
scheduler.stop();

// After the threads that potentially access these pointers have been stopped,
Expand Down Expand Up @@ -1234,7 +1234,7 @@ bool AppInitMain()

// Start the lightweight task scheduler thread
CScheduler::Function serviceLoop = std::bind(&CScheduler::serviceQueue, &scheduler);
scheduler.m_service_thread = std::thread(TraceThread<CScheduler::Function>, "scheduler", serviceLoop);
scheduler.m_service_thread = std::thread(&TraceThread, "scheduler", serviceLoop);

// Gather some entropy once per minute.
scheduler.scheduleEvery([]{
Expand Down
12 changes: 6 additions & 6 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1564,7 +1564,7 @@ void StartMapPort()
{
if (!g_upnp_thread.joinable()) {
assert(!g_upnp_interrupt);
g_upnp_thread = std::thread((std::bind(&TraceThread<void (*)()>, "upnp", &ThreadMapPort)));
g_upnp_thread = std::thread((std::bind(&TraceThread, "upnp", &ThreadMapPort)));
}
}

Expand Down Expand Up @@ -2354,15 +2354,15 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions)
}

// Send and receive from sockets, accept connections
threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this)));
threadSocketHandler = std::thread(&TraceThread, "net", std::bind(&CConnman::ThreadSocketHandler, this));

if (!gArgs.GetBoolArg("-dnsseed", true))
LogPrintf("DNS seeding disabled\n");
else
threadDNSAddressSeed = std::thread(&TraceThread<std::function<void()> >, "dnsseed", std::function<void()>(std::bind(&CConnman::ThreadDNSAddressSeed, this)));
threadDNSAddressSeed = std::thread(&TraceThread, "dnsseed", std::bind(&CConnman::ThreadDNSAddressSeed, this));

// Initiate outbound connections from -addnode
threadOpenAddedConnections = std::thread(&TraceThread<std::function<void()> >, "addcon", std::function<void()>(std::bind(&CConnman::ThreadOpenAddedConnections, this)));
threadOpenAddedConnections = std::thread(&TraceThread, "addcon", std::bind(&CConnman::ThreadOpenAddedConnections, this));

if (connOptions.m_use_addrman_outgoing && !connOptions.m_specified_outgoing.empty()) {
if (clientInterface) {
Expand All @@ -2373,10 +2373,10 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions)
return false;
}
if (connOptions.m_use_addrman_outgoing || !connOptions.m_specified_outgoing.empty())
threadOpenConnections = std::thread(&TraceThread<std::function<void()> >, "opencon", std::function<void()>(std::bind(&CConnman::ThreadOpenConnections, this, connOptions.m_specified_outgoing)));
threadOpenConnections = std::thread(&TraceThread, "opencon", std::bind(&CConnman::ThreadOpenConnections, this, connOptions.m_specified_outgoing));

// Process messages
threadMessageHandler = std::thread(&TraceThread<std::function<void()> >, "msghand", std::function<void()>(std::bind(&CConnman::ThreadMessageHandler, this)));
threadMessageHandler = std::thread(&TraceThread, "msghand", std::bind(&CConnman::ThreadMessageHandler, this));

// Dump network addresses
scheduler.scheduleEvery(std::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL * 1000);
Expand Down
45 changes: 27 additions & 18 deletions src/test/checkqueue_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ struct MemoryCheck {
};

struct FrozenCleanupCheck {
static std::atomic<uint64_t> nFrozen;
static volatile std::atomic<uint64_t> nFrozen;
static std::condition_variable cv;
static std::mutex m;
// Freezing can't be the default initialized behavior given how the queue
Expand All @@ -129,7 +129,7 @@ struct FrozenCleanupCheck {

// Static Allocations
std::mutex FrozenCleanupCheck::m{};
std::atomic<uint64_t> FrozenCleanupCheck::nFrozen{0};
volatile std::atomic<uint64_t> FrozenCleanupCheck::nFrozen{0};
std::condition_variable FrozenCleanupCheck::cv{};
std::mutex UniqueCheck::m;
std::unordered_multiset<size_t> UniqueCheck::results;
Expand All @@ -150,23 +150,25 @@ typedef CCheckQueue<FrozenCleanupCheck> FrozenCleanup_Queue;
*/
static void Correct_Queue_range(std::vector<size_t> range)
{
auto small_queue = std::make_unique<Correct_Queue>(QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS);
auto small_queue = new Correct_Queue{QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS};

// Make vChecks here to save on malloc (this test can be slow...)
std::vector<FakeCheckCheckCompletion> vChecks;
for (auto i : range) {
size_t total = i;
FakeCheckCheckCompletion::n_calls = 0;
CCheckQueueControl<FakeCheckCheckCompletion> control(small_queue.get());
CCheckQueueControl<FakeCheckCheckCompletion> control(small_queue);
while (total) {
vChecks.resize(std::min(total, (size_t) InsecureRandRange(10)));
total -= vChecks.size();
control.Add(vChecks);
}
BOOST_CHECK(control.Wait());
BOOST_REQUIRE(control.Wait());
if (FakeCheckCheckCompletion::n_calls != i) {
BOOST_REQUIRE_EQUAL(FakeCheckCheckCompletion::n_calls, i);
}
}
delete small_queue;
}

/** Test that 0 checks is correct
Expand Down Expand Up @@ -208,9 +210,10 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Correct_Random)
/** Test that failing checks are caught */
BOOST_AUTO_TEST_CASE(test_CheckQueue_Catches_Failure)
{
auto fail_queue = std::make_unique<Failing_Queue>(QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS);
auto fail_queue = new Failing_Queue{QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS};

for (size_t i = 0; i < 1001; ++i) {
CCheckQueueControl<FailingCheck> control(fail_queue.get());
CCheckQueueControl<FailingCheck> control(fail_queue);
size_t remaining = i;
while (remaining) {
size_t r = InsecureRandRange(10);
Expand All @@ -228,16 +231,17 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Catches_Failure)
BOOST_REQUIRE(success);
}
}
delete fail_queue;
}
// Test that a block validation which fails does not interfere with
// future blocks, ie, the bad state is cleared.
BOOST_AUTO_TEST_CASE(test_CheckQueue_Recovers_From_Failure)
{
auto fail_queue = std::make_unique<Failing_Queue>(QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS);
auto fail_queue = new Failing_Queue{QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS};

for (auto times = 0; times < 10; ++times) {
for (bool end_fails : {true, false}) {
CCheckQueueControl<FailingCheck> control(fail_queue.get());
CCheckQueueControl<FailingCheck> control(fail_queue);
{
std::vector<FailingCheck> vChecks;
vChecks.resize(100, false);
Expand All @@ -248,18 +252,19 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Recovers_From_Failure)
BOOST_REQUIRE(r != end_fails);
}
}
delete fail_queue;
}

// Test that unique checks are actually all called individually, rather than
// just one check being called repeatedly. Test that checks are not called
// more than once as well
BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck)
{
auto queue = std::make_unique<Unique_Queue>(QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS);
auto queue = new Unique_Queue{QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS};
size_t COUNT = 100000;
size_t total = COUNT;
{
CCheckQueueControl<UniqueCheck> control(queue.get());
CCheckQueueControl<UniqueCheck> control(queue);
while (total) {
size_t r = InsecureRandRange(10);
std::vector<UniqueCheck> vChecks;
Expand All @@ -277,6 +282,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck)
}
BOOST_REQUIRE(r);
}
delete queue;
}


Expand All @@ -287,12 +293,12 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck)
// time could leave the data hanging across a sequence of blocks.
BOOST_AUTO_TEST_CASE(test_CheckQueue_Memory)
{
auto queue = std::make_unique<Memory_Queue>(QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS);
auto queue = new Memory_Queue{QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS};

for (size_t i = 0; i < 1000; ++i) {
size_t total = i;
{
CCheckQueueControl<MemoryCheck> control(queue.get());
CCheckQueueControl<MemoryCheck> control(queue);
while (total) {
size_t r = InsecureRandRange(10);
std::vector<MemoryCheck> vChecks;
Expand All @@ -307,16 +313,17 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Memory)
}
BOOST_REQUIRE_EQUAL(MemoryCheck::fake_allocated_memory, 0U);
}
delete queue;
}

// Test that a new verification cannot occur until all checks
// have been destructed
BOOST_AUTO_TEST_CASE(test_CheckQueue_FrozenCleanup)
{
auto queue = std::make_unique<FrozenCleanup_Queue>(QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS);
auto queue = new FrozenCleanup_Queue{QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS};
bool fails = false;
std::thread t0([&]() {
CCheckQueueControl<FrozenCleanupCheck> control(queue.get());
CCheckQueueControl<FrozenCleanupCheck> control(queue);
std::vector<FrozenCleanupCheck> vChecks(1);
// Freezing can't be the default initialized behavior given how the queue
// swaps in default initialized Checks (otherwise freezing destructor
Expand Down Expand Up @@ -345,21 +352,22 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_FrozenCleanup)
// Wait for control to finish
t0.join();
BOOST_REQUIRE(!fails);
delete queue;
}


/** Test that CCheckQueueControl is threadsafe */
BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks)
{
auto queue = std::make_unique<Standard_Queue>(QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS);
auto queue = new Standard_Queue{QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS};
{
std::vector<std::thread> tg;
std::atomic<int> nThreads {0};
std::atomic<int> fails {0};
for (size_t i = 0; i < 3; ++i) {
tg.emplace_back(
[&]{
CCheckQueueControl<FakeCheck> control(queue.get());
CCheckQueueControl<FakeCheck> control(queue);
// While sleeping, no other thread should execute to this point
auto observed = ++nThreads;
MilliSleep(10);
Expand All @@ -382,7 +390,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks)
{
std::unique_lock<std::mutex> l(m);
tg.emplace_back([&]{
CCheckQueueControl<FakeCheck> control(queue.get());
CCheckQueueControl<FakeCheck> control(queue);
std::unique_lock<std::mutex> ll(m);
has_lock = true;
cv.notify_one();
Expand Down Expand Up @@ -411,6 +419,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks)
if (thread.joinable()) thread.join();
}
}
delete queue;
}
BOOST_AUTO_TEST_SUITE_END()

5 changes: 4 additions & 1 deletion src/test/test_tapyrus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
#include <rpc/register.h>
#include <script/sigcache.h>
#include <xfieldhistory.h>
#include <util.h>

#include <thread>

constexpr unsigned int CPubKey::SCHNORR_SIGNATURE_SIZE;

Expand Down Expand Up @@ -99,7 +102,7 @@ TestingSetup::TestingSetup(const std::string& chainName) : BasicTestingSetup(cha
// We have to run a scheduler thread to prevent ActivateBestChain
// from blocking due to queue overrun.
CScheduler::Function serviceLoop = std::bind(&CScheduler::serviceQueue, &scheduler);
scheduler.m_service_thread = std::thread(TraceThread<CScheduler::Function>, "scheduler", serviceLoop);
scheduler.m_service_thread = std::thread(&TraceThread, "scheduler", serviceLoop);
GetMainSignals().RegisterBackgroundSignalScheduler(scheduler);

mempool.setSanityCheck(1.0);
Expand Down
2 changes: 1 addition & 1 deletion src/torcontrol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ void StartTorControl()
return;
}

torControlThread = std::thread(std::bind(&TraceThread<void (*)()>, "torcontrol", &TorControlThread));
torControlThread = std::thread(std::bind(&TraceThread, "torcontrol", &TorControlThread));
}

void InterruptTorControl()
Expand Down
Loading

0 comments on commit ff3b571

Please sign in to comment.