From ff3b5711a556ed146ec44418ac1265056e38f1d4 Mon Sep 17 00:00:00 2001 From: Navia Bheeman Date: Thu, 22 Jun 2023 16:42:08 +0530 Subject: [PATCH] fix initialization of scheduler service thread initialize m_request_stop variable in the constructor and handle empty queue length separately --- src/checkqueue.h | 27 +++++++++++---------- src/index/base.cpp | 2 +- src/init.cpp | 6 ++--- src/net.cpp | 12 +++++----- src/test/checkqueue_tests.cpp | 45 +++++++++++++++++++++-------------- src/test/test_tapyrus.cpp | 5 +++- src/torcontrol.cpp | 2 +- src/util.cpp | 42 +++++++++++++++++++++++--------- src/util.h | 24 +++---------------- src/validation.cpp | 1 - 10 files changed, 91 insertions(+), 75 deletions(-) diff --git a/src/checkqueue.h b/src/checkqueue.h index cb41892621..d61524096d 100644 --- a/src/checkqueue.h +++ b/src/checkqueue.h @@ -9,6 +9,7 @@ #include #include +#include template @@ -132,8 +133,7 @@ class CCheckQueue std::mutex ControlMutex; //! Create a new check queue - explicit CCheckQueue(unsigned int batch_size, int worker_threads_num) - : nBatchSize(batch_size) + explicit CCheckQueue(unsigned int batch_size, int worker_threads_num) : nIdle(0), nTotal(0), fAllOk(true), nTodo(0), nBatchSize(batch_size), m_request_stop(false) { { WaitableLock loc(mutex); @@ -144,7 +144,7 @@ class CCheckQueue assert(m_worker_threads.empty()); for (int n = 0; n < worker_threads_num; ++n) { m_worker_threads.emplace_back([this, n]() { - RenameThread(strprintf("scriptch.%i", n).c_str()); + RenameThread(strprintf("scriptch.%i", n)); Loop(false /* worker thread */); }); } @@ -166,12 +166,18 @@ class CCheckQueue //! Add a batch of checks to the queue void Add(std::vector& vChecks) { - WaitableLock lock(mutex); - for (T& check : vChecks) { - queue.push_back(T()); - check.swap(queue.back()); + if (vChecks.empty()) { + return; } - nTodo += vChecks.size(); + + { + WaitableLock lock(mutex); + for (T& check : vChecks) { + queue.push_back(T()); + check.swap(queue.back()); + } + } + if (vChecks.size() == 1) condWorker.notify_one(); else if (vChecks.size() > 1) @@ -181,10 +187,7 @@ class CCheckQueue //! Stop all of the worker threads. ~CCheckQueue() { - { - WaitableLock lock(mutex); - m_request_stop = true; - } + m_request_stop = true; condWorker.notify_all(); for (std::thread& t : m_worker_threads) { t.join(); diff --git a/src/index/base.cpp b/src/index/base.cpp index 21991c7e15..de2e4d535d 100644 --- a/src/index/base.cpp +++ b/src/index/base.cpp @@ -263,7 +263,7 @@ void BaseIndex::Start() return; } - m_thread_sync = std::thread(&TraceThread>, GetName(), + m_thread_sync = std::thread(&TraceThread, GetName(), std::bind(&BaseIndex::ThreadSync, this)); } diff --git a/src/init.cpp b/src/init.cpp index 53178a3875..100d41c867 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -214,8 +214,8 @@ void Shutdown() StopTorControl(); - // After everything has been shut down, but before things get flushed - //2. stop scheduler and load block threads. + // After everything has been shut down, but before things get flushed, + //stop scheduler and load block threads. scheduler.stop(); // After the threads that potentially access these pointers have been stopped, @@ -1234,7 +1234,7 @@ bool AppInitMain() // Start the lightweight task scheduler thread CScheduler::Function serviceLoop = std::bind(&CScheduler::serviceQueue, &scheduler); - scheduler.m_service_thread = std::thread(TraceThread, "scheduler", serviceLoop); + scheduler.m_service_thread = std::thread(&TraceThread, "scheduler", serviceLoop); // Gather some entropy once per minute. scheduler.scheduleEvery([]{ diff --git a/src/net.cpp b/src/net.cpp index 669564d4e7..0574be3f4f 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1564,7 +1564,7 @@ void StartMapPort() { if (!g_upnp_thread.joinable()) { assert(!g_upnp_interrupt); - g_upnp_thread = std::thread((std::bind(&TraceThread, "upnp", &ThreadMapPort))); + g_upnp_thread = std::thread((std::bind(&TraceThread, "upnp", &ThreadMapPort))); } } @@ -2354,15 +2354,15 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions) } // Send and receive from sockets, accept connections - threadSocketHandler = std::thread(&TraceThread >, "net", std::function(std::bind(&CConnman::ThreadSocketHandler, this))); + threadSocketHandler = std::thread(&TraceThread, "net", std::bind(&CConnman::ThreadSocketHandler, this)); if (!gArgs.GetBoolArg("-dnsseed", true)) LogPrintf("DNS seeding disabled\n"); else - threadDNSAddressSeed = std::thread(&TraceThread >, "dnsseed", std::function(std::bind(&CConnman::ThreadDNSAddressSeed, this))); + threadDNSAddressSeed = std::thread(&TraceThread, "dnsseed", std::bind(&CConnman::ThreadDNSAddressSeed, this)); // Initiate outbound connections from -addnode - threadOpenAddedConnections = std::thread(&TraceThread >, "addcon", std::function(std::bind(&CConnman::ThreadOpenAddedConnections, this))); + threadOpenAddedConnections = std::thread(&TraceThread, "addcon", std::bind(&CConnman::ThreadOpenAddedConnections, this)); if (connOptions.m_use_addrman_outgoing && !connOptions.m_specified_outgoing.empty()) { if (clientInterface) { @@ -2373,10 +2373,10 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions) return false; } if (connOptions.m_use_addrman_outgoing || !connOptions.m_specified_outgoing.empty()) - threadOpenConnections = std::thread(&TraceThread >, "opencon", std::function(std::bind(&CConnman::ThreadOpenConnections, this, connOptions.m_specified_outgoing))); + threadOpenConnections = std::thread(&TraceThread, "opencon", std::bind(&CConnman::ThreadOpenConnections, this, connOptions.m_specified_outgoing)); // Process messages - threadMessageHandler = std::thread(&TraceThread >, "msghand", std::function(std::bind(&CConnman::ThreadMessageHandler, this))); + threadMessageHandler = std::thread(&TraceThread, "msghand", std::bind(&CConnman::ThreadMessageHandler, this)); // Dump network addresses scheduler.scheduleEvery(std::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL * 1000); diff --git a/src/test/checkqueue_tests.cpp b/src/test/checkqueue_tests.cpp index 103a3362ff..459ff83791 100644 --- a/src/test/checkqueue_tests.cpp +++ b/src/test/checkqueue_tests.cpp @@ -104,7 +104,7 @@ struct MemoryCheck { }; struct FrozenCleanupCheck { - static std::atomic nFrozen; + static volatile std::atomic nFrozen; static std::condition_variable cv; static std::mutex m; // Freezing can't be the default initialized behavior given how the queue @@ -129,7 +129,7 @@ struct FrozenCleanupCheck { // Static Allocations std::mutex FrozenCleanupCheck::m{}; -std::atomic FrozenCleanupCheck::nFrozen{0}; +volatile std::atomic FrozenCleanupCheck::nFrozen{0}; std::condition_variable FrozenCleanupCheck::cv{}; std::mutex UniqueCheck::m; std::unordered_multiset UniqueCheck::results; @@ -150,23 +150,25 @@ typedef CCheckQueue FrozenCleanup_Queue; */ static void Correct_Queue_range(std::vector range) { - auto small_queue = std::make_unique(QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS); + auto small_queue = new Correct_Queue{QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS}; + // Make vChecks here to save on malloc (this test can be slow...) std::vector vChecks; for (auto i : range) { size_t total = i; FakeCheckCheckCompletion::n_calls = 0; - CCheckQueueControl control(small_queue.get()); + CCheckQueueControl control(small_queue); while (total) { vChecks.resize(std::min(total, (size_t) InsecureRandRange(10))); total -= vChecks.size(); control.Add(vChecks); } - BOOST_CHECK(control.Wait()); + BOOST_REQUIRE(control.Wait()); if (FakeCheckCheckCompletion::n_calls != i) { BOOST_REQUIRE_EQUAL(FakeCheckCheckCompletion::n_calls, i); } } + delete small_queue; } /** Test that 0 checks is correct @@ -208,9 +210,10 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Correct_Random) /** Test that failing checks are caught */ BOOST_AUTO_TEST_CASE(test_CheckQueue_Catches_Failure) { - auto fail_queue = std::make_unique(QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS); + auto fail_queue = new Failing_Queue{QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS}; + for (size_t i = 0; i < 1001; ++i) { - CCheckQueueControl control(fail_queue.get()); + CCheckQueueControl control(fail_queue); size_t remaining = i; while (remaining) { size_t r = InsecureRandRange(10); @@ -228,16 +231,17 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Catches_Failure) BOOST_REQUIRE(success); } } + delete fail_queue; } // Test that a block validation which fails does not interfere with // future blocks, ie, the bad state is cleared. BOOST_AUTO_TEST_CASE(test_CheckQueue_Recovers_From_Failure) { - auto fail_queue = std::make_unique(QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS); + auto fail_queue = new Failing_Queue{QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS}; for (auto times = 0; times < 10; ++times) { for (bool end_fails : {true, false}) { - CCheckQueueControl control(fail_queue.get()); + CCheckQueueControl control(fail_queue); { std::vector vChecks; vChecks.resize(100, false); @@ -248,6 +252,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Recovers_From_Failure) BOOST_REQUIRE(r != end_fails); } } + delete fail_queue; } // Test that unique checks are actually all called individually, rather than @@ -255,11 +260,11 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Recovers_From_Failure) // more than once as well BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck) { - auto queue = std::make_unique(QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS); + auto queue = new Unique_Queue{QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS}; size_t COUNT = 100000; size_t total = COUNT; { - CCheckQueueControl control(queue.get()); + CCheckQueueControl control(queue); while (total) { size_t r = InsecureRandRange(10); std::vector vChecks; @@ -277,6 +282,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck) } BOOST_REQUIRE(r); } + delete queue; } @@ -287,12 +293,12 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck) // time could leave the data hanging across a sequence of blocks. BOOST_AUTO_TEST_CASE(test_CheckQueue_Memory) { - auto queue = std::make_unique(QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS); + auto queue = new Memory_Queue{QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS}; for (size_t i = 0; i < 1000; ++i) { size_t total = i; { - CCheckQueueControl control(queue.get()); + CCheckQueueControl control(queue); while (total) { size_t r = InsecureRandRange(10); std::vector vChecks; @@ -307,16 +313,17 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Memory) } BOOST_REQUIRE_EQUAL(MemoryCheck::fake_allocated_memory, 0U); } + delete queue; } // Test that a new verification cannot occur until all checks // have been destructed BOOST_AUTO_TEST_CASE(test_CheckQueue_FrozenCleanup) { - auto queue = std::make_unique(QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS); + auto queue = new FrozenCleanup_Queue{QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS}; bool fails = false; std::thread t0([&]() { - CCheckQueueControl control(queue.get()); + CCheckQueueControl control(queue); std::vector vChecks(1); // Freezing can't be the default initialized behavior given how the queue // swaps in default initialized Checks (otherwise freezing destructor @@ -345,13 +352,14 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_FrozenCleanup) // Wait for control to finish t0.join(); BOOST_REQUIRE(!fails); + delete queue; } /** Test that CCheckQueueControl is threadsafe */ BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks) { - auto queue = std::make_unique(QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS); + auto queue = new Standard_Queue{QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS}; { std::vector tg; std::atomic nThreads {0}; @@ -359,7 +367,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks) for (size_t i = 0; i < 3; ++i) { tg.emplace_back( [&]{ - CCheckQueueControl control(queue.get()); + CCheckQueueControl control(queue); // While sleeping, no other thread should execute to this point auto observed = ++nThreads; MilliSleep(10); @@ -382,7 +390,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks) { std::unique_lock l(m); tg.emplace_back([&]{ - CCheckQueueControl control(queue.get()); + CCheckQueueControl control(queue); std::unique_lock ll(m); has_lock = true; cv.notify_one(); @@ -411,6 +419,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks) if (thread.joinable()) thread.join(); } } + delete queue; } BOOST_AUTO_TEST_SUITE_END() diff --git a/src/test/test_tapyrus.cpp b/src/test/test_tapyrus.cpp index 9d46326aff..f4afa05c96 100644 --- a/src/test/test_tapyrus.cpp +++ b/src/test/test_tapyrus.cpp @@ -18,6 +18,9 @@ #include #include