diff --git a/src/addrman.cpp b/src/addrman.cpp index 093b263ab3..199130c205 100644 --- a/src/addrman.cpp +++ b/src/addrman.cpp @@ -8,6 +8,7 @@ #include #include #include +#include int CAddrInfo::GetTriedBucket(const uint256& nKey) const { diff --git a/src/bench/block_assemble.cpp b/src/bench/block_assemble.cpp index 43cc919ad1..c4355009a1 100644 --- a/src/bench/block_assemble.cpp +++ b/src/bench/block_assemble.cpp @@ -86,15 +86,15 @@ static void AssembleBlock(benchmark::State& state) InitSignatureCache(); InitScriptExecutionCache(); - boost::thread_group thread_group; CScheduler scheduler; + std::thread thread(std::bind(&CScheduler::serviceQueue, &scheduler)); { ::pblocktree.reset(new CBlockTreeDB(1 << 20, true)); ::pcoinsdbview.reset(new CCoinsViewDB(1 << 23, true)); ::pcoinsTip.reset(new CCoinsViewCache(pcoinsdbview.get())); - thread_group.create_thread(std::bind(&CScheduler::serviceQueue, &scheduler)); GetMainSignals().RegisterBackgroundSignalScheduler(scheduler); + LoadGenesisBlock(); CValidationState state; ActivateBestChain(state); @@ -130,8 +130,9 @@ static void AssembleBlock(benchmark::State& state) PrepareBlock(SCRIPT_PUB); } - thread_group.interrupt_all(); - thread_group.join_all(); + scheduler.stop(); + if(thread.joinable()) + thread.join(); GetMainSignals().FlushBackgroundCallbacks(); GetMainSignals().UnregisterBackgroundSignalScheduler(); } diff --git a/src/bench/checkqueue.cpp b/src/bench/checkqueue.cpp index 79689f6e0b..29bd03b329 100644 --- a/src/bench/checkqueue.cpp +++ b/src/bench/checkqueue.cpp @@ -8,7 +8,6 @@ #include #include #include -#include #include @@ -36,11 +35,8 @@ static void CCheckQueueSpeedPrevectorJob(benchmark::State& state) } void swap(PrevectorJob& x){p.swap(x.p);}; }; - CCheckQueue queue {QUEUE_BATCH_SIZE}; - boost::thread_group tg; - for (auto x = 0; x < std::max(MIN_CORES, GetNumCores()); ++x) { - tg.create_thread([&]{queue.Thread();}); - } + int num_threads= GetNumCores() - 1; + CCheckQueue queue {QUEUE_BATCH_SIZE, num_threads}; while (state.KeepRunning()) { // Make insecure_rand here so that each iteration is identical. FastRandomContext insecure_rand(true); @@ -50,13 +46,11 @@ static void CCheckQueueSpeedPrevectorJob(benchmark::State& state) vChecks.reserve(BATCH_SIZE); for (size_t x = 0; x < BATCH_SIZE; ++x) vChecks.emplace_back(insecure_rand); - control.Add(vChecks); + control.Add(std::move(vChecks)); } // control waits for completion by RAII, but // it is done explicitly here for clarity control.Wait(); } - tg.interrupt_all(); - tg.join_all(); } BENCHMARK(CCheckQueueSpeedPrevectorJob, 1400); diff --git a/src/checkqueue.h b/src/checkqueue.h index 978e23a7c4..d0cc9b7b40 100644 --- a/src/checkqueue.h +++ b/src/checkqueue.h @@ -10,8 +10,6 @@ #include #include -#include -#include template class CCheckQueueControl; @@ -31,13 +29,13 @@ class CCheckQueue { private: //! Mutex to protect the inner state - boost::mutex mutex; + std::mutex mutex; //! Worker threads block on this when out of work - boost::condition_variable condWorker; + std::condition_variable condWorker; //! Master thread blocks on this when out of work - boost::condition_variable condMaster; + std::condition_variable condMaster; //! The queue of elements to be processed. //! As the order of booleans doesn't matter, it is used as a LIFO (stack) @@ -62,17 +60,20 @@ class CCheckQueue //! The maximum number of elements to be processed in one batch unsigned int nBatchSize; + std::vector m_worker_threads; + bool m_request_stop; + /** Internal function that does bulk of the verification work. */ bool Loop(bool fMaster = false) { - boost::condition_variable& cond = fMaster ? condMaster : condWorker; + std::condition_variable& cond = fMaster ? condMaster : condWorker; std::vector vChecks; vChecks.reserve(nBatchSize); unsigned int nNow = 0; bool fOk = true; do { { - boost::unique_lock lock(mutex); + WaitableLock lock(mutex); // first do the clean-up of the previous loop run (allowing us to do it in the same critsect) if (nNow) { fAllOk &= fOk; @@ -85,7 +86,7 @@ class CCheckQueue nTotal++; } // logically, the do loop starts here - while (queue.empty()) { + while (queue.empty() && !m_request_stop) { if (fMaster && nTodo == 0) { nTotal--; bool fRet = fAllOk; @@ -99,19 +100,18 @@ class CCheckQueue cond.wait(lock); // wait nIdle--; } + if (m_request_stop) { + return false; + } // Decide how many work units to process now. // * Do not try to do everything at once, but aim for increasingly smaller batches so // all workers finish approximately simultaneously. // * Try to account for idle jobs which will instantly start helping. // * Don't do batches smaller than 1 (duh), or larger than nBatchSize. nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1))); - vChecks.resize(nNow); - for (unsigned int i = 0; i < nNow; i++) { - // We want the lock on the mutex to be as short as possible, so swap jobs from the global - // queue to the local batch vector instead of copying. - vChecks[i].swap(queue.back()); - queue.pop_back(); - } + auto start_it = queue.end() - nNow; + vChecks.assign(std::make_move_iterator(start_it), std::make_move_iterator(queue.end())); + queue.erase(start_it, queue.end()); // Check whether we need to do work at all fOk = fAllOk; } @@ -125,17 +125,33 @@ class CCheckQueue public: //! Mutex to ensure only one concurrent CCheckQueueControl - boost::mutex ControlMutex; + std::mutex ControlMutex; //! Create a new check queue - explicit CCheckQueue(unsigned int nBatchSizeIn) : nIdle(0), nTotal(0), fAllOk(true), nTodo(0), nBatchSize(nBatchSizeIn) {} - - //! Worker thread - void Thread() + explicit CCheckQueue(unsigned int batch_size, int worker_threads_num) : nIdle(0), nTotal(0), fAllOk(true), nTodo(0), nBatchSize(batch_size), m_request_stop(false) { - Loop(); + { + WaitableLock loc(mutex); + nIdle = 0; + nTotal = 0; + fAllOk = true; + } + assert(m_worker_threads.empty()); + for (int n = 0; n < worker_threads_num; ++n) { + m_worker_threads.emplace_back([this, n]() { + RenameThread(strprintf("scriptch.%i", n)); + Loop(false /* worker thread */); + }); + } } + // Since this class manages its own resources, which is a thread + // pool `m_worker_threads`, copy and move operations are not appropriate. + CCheckQueue(const CCheckQueue&) = delete; + CCheckQueue& operator=(const CCheckQueue&) = delete; + CCheckQueue(CCheckQueue&&) = delete; + CCheckQueue& operator=(CCheckQueue&&) = delete; + //! Wait until execution finishes, and return whether all evaluations were successful. bool Wait() { @@ -143,22 +159,32 @@ class CCheckQueue } //! Add a batch of checks to the queue - void Add(std::vector& vChecks) + void Add(std::vector&& vChecks) { - boost::unique_lock lock(mutex); - for (T& check : vChecks) { - queue.push_back(T()); - check.swap(queue.back()); + if (vChecks.empty()) { + return; + } + + { + WaitableLock lock(mutex); + queue.insert(queue.end(), std::make_move_iterator(vChecks.begin()), std::make_move_iterator(vChecks.end())); + nTodo += vChecks.size(); } - nTodo += vChecks.size(); + if (vChecks.size() == 1) condWorker.notify_one(); else if (vChecks.size() > 1) condWorker.notify_all(); } + //! Stop all of the worker threads. ~CCheckQueue() { + m_request_stop = true; + condWorker.notify_all(); + for (std::thread& t : m_worker_threads) { + t.join(); + } } }; @@ -195,10 +221,10 @@ class CCheckQueueControl return fRet; } - void Add(std::vector& vChecks) + void Add(std::vector&& vChecks) { if (pqueue != nullptr) - pqueue->Add(vChecks); + pqueue->Add(std::move(vChecks)); } ~CCheckQueueControl() diff --git a/src/index/base.cpp b/src/index/base.cpp index 21991c7e15..de2e4d535d 100644 --- a/src/index/base.cpp +++ b/src/index/base.cpp @@ -263,7 +263,7 @@ void BaseIndex::Start() return; } - m_thread_sync = std::thread(&TraceThread>, GetName(), + m_thread_sync = std::thread(&TraceThread, GetName(), std::bind(&BaseIndex::ThreadSync, this)); } diff --git a/src/index/txindex.cpp b/src/index/txindex.cpp index e46494e4a2..97ae45302b 100644 --- a/src/index/txindex.cpp +++ b/src/index/txindex.cpp @@ -152,7 +152,6 @@ bool TxIndex::DB::MigrateData(CBlockTreeDB& block_tree_db, const CBlockLocator& bool interrupted = false; std::unique_ptr cursor(block_tree_db.NewIterator()); for (cursor->Seek(begin_key); cursor->Valid(); cursor->Next()) { - boost::this_thread::interruption_point(); if (ShutdownRequested()) { interrupted = true; break; diff --git a/src/init.cpp b/src/init.cpp index d26047e64a..100d41c867 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -60,7 +60,8 @@ #include #include #include -#include + +#include #if ENABLE_ZMQ #include @@ -166,7 +167,6 @@ class CCoinsViewErrorCatcher final : public CCoinsViewBacked static std::unique_ptr pcoinscatcher; static std::unique_ptr globalVerifyHandle; -static boost::thread_group threadGroup; static CScheduler scheduler; void Interrupt() @@ -214,10 +214,9 @@ void Shutdown() StopTorControl(); - // After everything has been shut down, but before things get flushed, stop the - // CScheduler/checkqueue threadGroup - threadGroup.interrupt_all(); - threadGroup.join_all(); + // After everything has been shut down, but before things get flushed, + //stop scheduler and load block threads. + scheduler.stop(); // After the threads that potentially access these pointers have been stopped, // destruct and reset all to nullptr. @@ -1230,13 +1229,12 @@ bool AppInitMain() LogPrintf("Using %u threads for script verification\n", nScriptCheckThreads); if (nScriptCheckThreads) { - for (int i=0; i, "scheduler", serviceLoop)); + scheduler.m_service_thread = std::thread(&TraceThread, "scheduler", serviceLoop); // Gather some entropy once per minute. scheduler.scheduleEvery([]{ @@ -1634,7 +1632,7 @@ bool AppInitMain() vImportFiles.push_back(strFile); } - threadGroup.create_thread(std::bind(&ThreadImport, vImportFiles, fReloadxfield)); + scheduler.m_load_block = std::thread(std::bind(&ThreadImport, vImportFiles, fReloadxfield)); // Wait for genesis block to be processed { diff --git a/src/interfaces/node.cpp b/src/interfaces/node.cpp index 462ce4ef01..0c6e41b974 100644 --- a/src/interfaces/node.cpp +++ b/src/interfaces/node.cpp @@ -41,7 +41,6 @@ #endif #include -#include #include namespace interfaces { diff --git a/src/net.cpp b/src/net.cpp index 669564d4e7..0574be3f4f 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1564,7 +1564,7 @@ void StartMapPort() { if (!g_upnp_thread.joinable()) { assert(!g_upnp_interrupt); - g_upnp_thread = std::thread((std::bind(&TraceThread, "upnp", &ThreadMapPort))); + g_upnp_thread = std::thread((std::bind(&TraceThread, "upnp", &ThreadMapPort))); } } @@ -2354,15 +2354,15 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions) } // Send and receive from sockets, accept connections - threadSocketHandler = std::thread(&TraceThread >, "net", std::function(std::bind(&CConnman::ThreadSocketHandler, this))); + threadSocketHandler = std::thread(&TraceThread, "net", std::bind(&CConnman::ThreadSocketHandler, this)); if (!gArgs.GetBoolArg("-dnsseed", true)) LogPrintf("DNS seeding disabled\n"); else - threadDNSAddressSeed = std::thread(&TraceThread >, "dnsseed", std::function(std::bind(&CConnman::ThreadDNSAddressSeed, this))); + threadDNSAddressSeed = std::thread(&TraceThread, "dnsseed", std::bind(&CConnman::ThreadDNSAddressSeed, this)); // Initiate outbound connections from -addnode - threadOpenAddedConnections = std::thread(&TraceThread >, "addcon", std::function(std::bind(&CConnman::ThreadOpenAddedConnections, this))); + threadOpenAddedConnections = std::thread(&TraceThread, "addcon", std::bind(&CConnman::ThreadOpenAddedConnections, this)); if (connOptions.m_use_addrman_outgoing && !connOptions.m_specified_outgoing.empty()) { if (clientInterface) { @@ -2373,10 +2373,10 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions) return false; } if (connOptions.m_use_addrman_outgoing || !connOptions.m_specified_outgoing.empty()) - threadOpenConnections = std::thread(&TraceThread >, "opencon", std::function(std::bind(&CConnman::ThreadOpenConnections, this, connOptions.m_specified_outgoing))); + threadOpenConnections = std::thread(&TraceThread, "opencon", std::bind(&CConnman::ThreadOpenConnections, this, connOptions.m_specified_outgoing)); // Process messages - threadMessageHandler = std::thread(&TraceThread >, "msghand", std::function(std::bind(&CConnman::ThreadMessageHandler, this))); + threadMessageHandler = std::thread(&TraceThread, "msghand", std::bind(&CConnman::ThreadMessageHandler, this)); // Dump network addresses scheduler.scheduleEvery(std::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL * 1000); diff --git a/src/rpc/blockchain.cpp b/src/rpc/blockchain.cpp index 1fc818fc1f..54d8c6509c 100644 --- a/src/rpc/blockchain.cpp +++ b/src/rpc/blockchain.cpp @@ -40,7 +40,6 @@ #include #include -#include // boost::thread::interrupt #include #include @@ -839,7 +838,6 @@ static bool GetUTXOStats(CCoinsView *view, CCoinsStats &stats) uint256 prevkey; std::map outputs; while (pcursor->Valid()) { - boost::this_thread::interruption_point(); COutPoint key; Coin coin; if (pcursor->GetKey(key) && pcursor->GetValue(coin)) { @@ -1832,7 +1830,6 @@ bool FindScriptPubKey(std::atomic& scan_progress, const std::atomic& Coin coin; if (!cursor->GetKey(key) || !cursor->GetValue(coin)) return false; if (++count % 8192 == 0) { - boost::this_thread::interruption_point(); if (should_abort) { // allow to abort the scan via the abort reference return false; diff --git a/src/scheduler.cpp b/src/scheduler.cpp index d2b882d842..9124a145d8 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -9,6 +9,7 @@ #include #include +#include CScheduler::CScheduler() : nThreadsServicingQueue(0), stopRequested(false), stopWhenEmpty(false) { @@ -20,18 +21,9 @@ CScheduler::~CScheduler() } -#if BOOST_VERSION < 105000 -static boost::system_time toPosixTime(const boost::chrono::system_clock::time_point& t) -{ - // Creating the posix_time using from_time_t loses sub-second precision. So rather than exporting the time_point to time_t, - // start with a posix_time at the epoch (0) and add the milliseconds that have passed since then. - return boost::posix_time::from_time_t(0) + boost::posix_time::milliseconds(boost::chrono::duration_cast(t.time_since_epoch()).count()); -} -#endif - void CScheduler::serviceQueue() { - boost::unique_lock lock(newTaskMutex); + std::unique_lock lock(newTaskMutex); ++nThreadsServicingQueue; // newTaskMutex is locked throughout this loop EXCEPT @@ -40,7 +32,7 @@ void CScheduler::serviceQueue() while (!shouldStop()) { try { if (!shouldStop() && taskQueue.empty()) { - reverse_lock > rlock(lock); + reverse_lock > rlock(lock); } while (!shouldStop() && taskQueue.empty()) { // Wait until there is something to do. @@ -49,22 +41,11 @@ void CScheduler::serviceQueue() // Wait until either there is a new task, or until // the time of the first item on the queue: - -// wait_until needs boost 1.50 or later; older versions have timed_wait: -#if BOOST_VERSION < 105000 - while (!shouldStop() && !taskQueue.empty() && - newTaskScheduled.timed_wait(lock, toPosixTime(taskQueue.begin()->first))) { - // Keep waiting until timeout - } -#else - // Some boost versions have a conflicting overload of wait_until that returns void. - // Explicitly use a template here to avoid hitting that overload. while (!shouldStop() && !taskQueue.empty()) { - boost::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first; - if (newTaskScheduled.wait_until<>(lock, timeToWaitFor) == boost::cv_status::timeout) + std::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first; + if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) break; // Exit loop after timeout, it means we reached the time of the event } -#endif // If there are multiple threads, the queue can empty while we're waiting (another // thread may service the task we were waiting on). if (shouldStop() || taskQueue.empty()) @@ -76,7 +57,7 @@ void CScheduler::serviceQueue() { // Unlock before calling f, so it can reschedule itself or another task // without deadlocking: - reverse_lock > rlock(lock); + reverse_lock > rlock(lock); f(); } } catch (...) { @@ -91,19 +72,25 @@ void CScheduler::serviceQueue() void CScheduler::stop(bool drain) { { - boost::unique_lock lock(newTaskMutex); + std::unique_lock lock(newTaskMutex); if (drain) stopWhenEmpty = true; else stopRequested = true; } newTaskScheduled.notify_all(); + + if (m_service_thread.joinable()) + m_service_thread.join(); + + if(m_load_block.joinable()) + m_load_block.join(); } -void CScheduler::schedule(CScheduler::Function f, boost::chrono::system_clock::time_point t) +void CScheduler::schedule(CScheduler::Function f, std::chrono::system_clock::time_point t) { { - boost::unique_lock lock(newTaskMutex); + std::unique_lock lock(newTaskMutex); taskQueue.insert(std::make_pair(t, f)); } newTaskScheduled.notify_one(); @@ -111,7 +98,7 @@ void CScheduler::schedule(CScheduler::Function f, boost::chrono::system_clock::t void CScheduler::scheduleFromNow(CScheduler::Function f, int64_t deltaMilliSeconds) { - schedule(f, boost::chrono::system_clock::now() + boost::chrono::milliseconds(deltaMilliSeconds)); + schedule(f, std::chrono::system_clock::now() + std::chrono::milliseconds(deltaMilliSeconds)); } static void Repeat(CScheduler* s, CScheduler::Function f, int64_t deltaMilliSeconds) @@ -125,10 +112,10 @@ void CScheduler::scheduleEvery(CScheduler::Function f, int64_t deltaMilliSeconds scheduleFromNow(std::bind(&Repeat, this, f, deltaMilliSeconds), deltaMilliSeconds); } -size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first, - boost::chrono::system_clock::time_point &last) const +size_t CScheduler::getQueueInfo(std::chrono::system_clock::time_point &first, + std::chrono::system_clock::time_point &last) const { - boost::unique_lock lock(newTaskMutex); + std::unique_lock lock(newTaskMutex); size_t result = taskQueue.size(); if (!taskQueue.empty()) { first = taskQueue.begin()->first; @@ -138,7 +125,7 @@ size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first, } bool CScheduler::AreThreadsServicingQueue() const { - boost::unique_lock lock(newTaskMutex); + std::unique_lock lock(newTaskMutex); return nThreadsServicingQueue; } diff --git a/src/scheduler.h b/src/scheduler.h index a1c064ef1e..e9cd9b24ff 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -5,14 +5,14 @@ #ifndef BITCOIN_SCHEDULER_H #define BITCOIN_SCHEDULER_H -// -// NOTE: -// boost::thread / boost::chrono should be ported to std::thread / std::chrono -// when we support C++11. -// -#include -#include + #include +#include +#include +#include +#include +#include +#include #include @@ -25,7 +25,7 @@ // CScheduler* s = new CScheduler(); // s->scheduleFromNow(doSomething, 11); // Assuming a: void doSomething() { } // s->scheduleFromNow(std::bind(Class::func, this, argument), 3); -// boost::thread* t = new boost::thread(std::bind(CScheduler::serviceQueue, s)); +// std::thread* t = new std::thread(std::bind(CScheduler::serviceQueue, s)); // // ... then at program shutdown, clean up the thread running serviceQueue: // t->interrupt(); @@ -40,10 +40,13 @@ class CScheduler CScheduler(); ~CScheduler(); + std::thread m_service_thread; + std::thread m_load_block; + typedef std::function Function; // Call func at/after time t - void schedule(Function f, boost::chrono::system_clock::time_point t=boost::chrono::system_clock::now()); + void schedule(Function f, std::chrono::system_clock::time_point t=std::chrono::system_clock::now()); // Convenience method: call f once deltaSeconds from now void scheduleFromNow(Function f, int64_t deltaMilliSeconds); @@ -57,8 +60,7 @@ class CScheduler // To keep things as simple as possible, there is no unschedule. - // Services the queue 'forever'. Should be run in a thread, - // and interrupted using boost::interrupt_thread + // Services the queue 'forever'. Should be run in a thread void serviceQueue(); // Tell any threads running serviceQueue to stop as soon as they're @@ -68,16 +70,16 @@ class CScheduler // Returns number of tasks waiting to be serviced, // and first and last task times - size_t getQueueInfo(boost::chrono::system_clock::time_point &first, - boost::chrono::system_clock::time_point &last) const; + size_t getQueueInfo(std::chrono::system_clock::time_point &first, + std::chrono::system_clock::time_point &last) const; // Returns true if there are threads actively running in serviceQueue() bool AreThreadsServicingQueue() const; private: - std::multimap taskQueue; - boost::condition_variable newTaskScheduled; - mutable boost::mutex newTaskMutex; + std::multimap taskQueue; + std::condition_variable newTaskScheduled; + mutable std::mutex newTaskMutex; int nThreadsServicingQueue; bool stopRequested; bool stopWhenEmpty; diff --git a/src/sync.h b/src/sync.h index 11c1253757..d1d5aeaa45 100644 --- a/src/sync.h +++ b/src/sync.h @@ -73,16 +73,18 @@ class LOCKABLE AnnotatedMixin : public PARENT #ifdef DEBUG_LOCKORDER void EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs, bool fTry = false); void LeaveCritical(); +void CheckLastCritical(void* cs, std::string& lockname, const char* guardname, const char* file, int line); std::string LocksHeld(); void AssertLockHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs) ASSERT_EXCLUSIVE_LOCK(cs); void AssertLockNotHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs); void DeleteLock(void* cs); #else -void static inline EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs, bool fTry = false) {} -void static inline LeaveCritical() {} -void static inline AssertLockHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs) ASSERT_EXCLUSIVE_LOCK(cs) {} -void static inline AssertLockNotHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs) {} -void static inline DeleteLock(void* cs) {} +void inline EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs, bool fTry = false) {} +void inline LeaveCritical() {} +void inline CheckLastCritical(void* cs, std::string& lockname, const char* guardname, const char* file, int line) {} +void inline AssertLockHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs) ASSERT_EXCLUSIVE_LOCK(cs) {} +void inline AssertLockNotHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs) {} +void inline DeleteLock(void* cs) {} #endif #define AssertLockHeld(cs) AssertLockHeldInternal(#cs, __FILE__, __LINE__, &cs) #define AssertLockNotHeld(cs) AssertLockNotHeldInternal(#cs, __FILE__, __LINE__, &cs) @@ -178,17 +180,18 @@ class SCOPED_LOCKABLE CCriticalBlock #define LOCK(cs) CCriticalBlock PASTE2(criticalblock, __COUNTER__)(cs, #cs, __FILE__, __LINE__) #define LOCK2(cs1, cs2) CCriticalBlock criticalblock1(cs1, #cs1, __FILE__, __LINE__), criticalblock2(cs2, #cs2, __FILE__, __LINE__) #define TRY_LOCK(cs, name) CCriticalBlock name(cs, #cs, __FILE__, __LINE__, true) +#define WAIT_LOCK(cs, name) CCriticalBlock name(cs, #cs, __FILE__, __LINE__) #define ENTER_CRITICAL_SECTION(cs) \ { \ - EnterCritical(#cs, __FILE__, __LINE__, (void*)(&cs)); \ + EnterCritical(#cs, __FILE__, __LINE__, (&cs)); \ (cs).lock(); \ } #define LEAVE_CRITICAL_SECTION(cs) \ { \ - (cs).unlock(); \ - LeaveCritical(); \ + (cs).unlock(); \ + LeaveCritical(); \ } class CSemaphore diff --git a/src/tapyrus-cli.cpp b/src/tapyrus-cli.cpp index 8a7f69db11..fdba571501 100644 --- a/src/tapyrus-cli.cpp +++ b/src/tapyrus-cli.cpp @@ -495,9 +495,6 @@ static int CommandLineRPC(int argc, char *argv[]) } } while (fWait); } - catch (const boost::thread_interrupted&) { - throw; - } catch (const std::exception& e) { strPrint = std::string("error: ") + e.what(); nRet = EXIT_FAILURE; diff --git a/src/tapyrus-tx.cpp b/src/tapyrus-tx.cpp index 7d5f01739c..92553f4f85 100644 --- a/src/tapyrus-tx.cpp +++ b/src/tapyrus-tx.cpp @@ -800,10 +800,6 @@ static int CommandLineRawTx(int argc, char* argv[]) OutputTx(tx); } - - catch (const boost::thread_interrupted&) { - throw; - } catch (const std::exception& e) { strPrint = std::string("error: ") + e.what(); nRet = EXIT_FAILURE; diff --git a/src/test/checkqueue_tests.cpp b/src/test/checkqueue_tests.cpp index 946171efe8..f29c39df59 100644 --- a/src/test/checkqueue_tests.cpp +++ b/src/test/checkqueue_tests.cpp @@ -1,4 +1,4 @@ -// Copyright (c) 2012-2018 The Bitcoin Core developers +// Copyright (c) 2012-2020 The Bitcoin Core developers // Copyright (c) 2019 Chaintope Inc. // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. @@ -10,7 +10,7 @@ #include #include #include -#include + #include #include #include @@ -26,13 +26,13 @@ BOOST_FIXTURE_TEST_SUITE(checkqueue_tests, TestingSetup) static const unsigned int QUEUE_BATCH_SIZE = 128; +static const int SCRIPT_CHECK_THREADS = 3; struct FakeCheck { bool operator()() { return true; } - void swap(FakeCheck& x){}; }; struct FakeCheckCheckCompletion { @@ -42,21 +42,15 @@ struct FakeCheckCheckCompletion { n_calls.fetch_add(1, std::memory_order_relaxed); return true; } - void swap(FakeCheckCheckCompletion& x){}; }; struct FailingCheck { bool fails; FailingCheck(bool _fails) : fails(_fails){}; - FailingCheck() : fails(true){}; bool operator()() { return !fails; } - void swap(FailingCheck& x) - { - std::swap(fails, x.fails); - }; }; struct UniqueCheck { @@ -64,14 +58,12 @@ struct UniqueCheck { static std::unordered_multiset results; size_t check_id; UniqueCheck(size_t check_id_in) : check_id(check_id_in){}; - UniqueCheck() : check_id(0){}; bool operator()() { std::lock_guard l(m); results.insert(check_id); return true; } - void swap(UniqueCheck& x) { std::swap(x.check_id, check_id); }; }; @@ -82,7 +74,6 @@ struct MemoryCheck { { return true; } - MemoryCheck(){}; MemoryCheck(const MemoryCheck& x) { // We have to do this to make sure that destructor calls are paired @@ -99,7 +90,6 @@ struct MemoryCheck { { fake_allocated_memory.fetch_sub(b, std::memory_order_relaxed); }; - void swap(MemoryCheck& x) { std::swap(b, x.b); }; }; struct FrozenCleanupCheck { @@ -108,12 +98,12 @@ struct FrozenCleanupCheck { static std::mutex m; // Freezing can't be the default initialized behavior given how the queue // swaps in default initialized Checks. - bool should_freeze {false}; - bool operator()() + bool should_freeze {true}; + bool operator()() const { return true; } - FrozenCleanupCheck() {} + FrozenCleanupCheck() = default; ~FrozenCleanupCheck() { if (should_freeze) { @@ -123,7 +113,17 @@ struct FrozenCleanupCheck { cv.wait(l, []{ return nFrozen.load(std::memory_order_relaxed) == 0;}); } } - void swap(FrozenCleanupCheck& x){std::swap(should_freeze, x.should_freeze);}; + FrozenCleanupCheck(FrozenCleanupCheck&& other) noexcept + { + should_freeze = other.should_freeze; + other.should_freeze = false; + } + FrozenCleanupCheck& operator=(FrozenCleanupCheck&& other) noexcept + { + should_freeze = other.should_freeze; + other.should_freeze = false; + return *this; + } }; // Static Allocations @@ -149,30 +149,25 @@ typedef CCheckQueue FrozenCleanup_Queue; */ static void Correct_Queue_range(std::vector range) { - auto small_queue = std::unique_ptr(new Correct_Queue {QUEUE_BATCH_SIZE}); - boost::thread_group tg; - for (auto x = 0; x < nScriptCheckThreads; ++x) { - tg.create_thread([&]{small_queue->Thread();}); - } + auto small_queue = new Correct_Queue{QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS}; + // Make vChecks here to save on malloc (this test can be slow...) std::vector vChecks; + vChecks.reserve(9); for (auto i : range) { size_t total = i; FakeCheckCheckCompletion::n_calls = 0; - CCheckQueueControl control(small_queue.get()); + CCheckQueueControl control(small_queue); while (total) { - vChecks.resize(std::min(total, (size_t) InsecureRandRange(10))); + vChecks.clear(); + vChecks.resize(std::min(total, InsecureRandRange(10))); total -= vChecks.size(); - control.Add(vChecks); + control.Add(std::move(vChecks)); } BOOST_REQUIRE(control.Wait()); - if (FakeCheckCheckCompletion::n_calls != i) { - BOOST_REQUIRE_EQUAL(FakeCheckCheckCompletion::n_calls, i); - BOOST_TEST_MESSAGE("Failure on trial " << i << " expected, got " << FakeCheckCheckCompletion::n_calls); - } + BOOST_REQUIRE_EQUAL(FakeCheckCheckCompletion::n_calls, i); } - tg.interrupt_all(); - tg.join_all(); + delete small_queue; } /** Test that 0 checks is correct @@ -214,15 +209,10 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Correct_Random) /** Test that failing checks are caught */ BOOST_AUTO_TEST_CASE(test_CheckQueue_Catches_Failure) { - auto fail_queue = std::unique_ptr(new Failing_Queue {QUEUE_BATCH_SIZE}); - - boost::thread_group tg; - for (auto x = 0; x < nScriptCheckThreads; ++x) { - tg.create_thread([&]{fail_queue->Thread();}); - } + auto fail_queue = new Failing_Queue{QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS}; for (size_t i = 0; i < 1001; ++i) { - CCheckQueueControl control(fail_queue.get()); + CCheckQueueControl control(fail_queue); size_t remaining = i; while (remaining) { size_t r = InsecureRandRange(10); @@ -231,7 +221,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Catches_Failure) vChecks.reserve(r); for (size_t k = 0; k < r && remaining; k++, remaining--) vChecks.emplace_back(remaining == 1); - control.Add(vChecks); + control.Add(std::move(vChecks)); } bool success = control.Wait(); if (i > 0) { @@ -240,34 +230,28 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Catches_Failure) BOOST_REQUIRE(success); } } - tg.interrupt_all(); - tg.join_all(); + delete fail_queue; } // Test that a block validation which fails does not interfere with // future blocks, ie, the bad state is cleared. BOOST_AUTO_TEST_CASE(test_CheckQueue_Recovers_From_Failure) { - auto fail_queue = std::unique_ptr(new Failing_Queue {QUEUE_BATCH_SIZE}); - boost::thread_group tg; - for (auto x = 0; x < nScriptCheckThreads; ++x) { - tg.create_thread([&]{fail_queue->Thread();}); - } + auto fail_queue = new Failing_Queue{QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS}; for (auto times = 0; times < 10; ++times) { for (bool end_fails : {true, false}) { - CCheckQueueControl control(fail_queue.get()); + CCheckQueueControl control(fail_queue); { std::vector vChecks; vChecks.resize(100, false); vChecks[99] = end_fails; - control.Add(vChecks); + control.Add(std::move(vChecks)); } bool r =control.Wait(); BOOST_REQUIRE(r != end_fails); } } - tg.interrupt_all(); - tg.join_all(); + delete fail_queue; } // Test that unique checks are actually all called individually, rather than @@ -275,32 +259,29 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Recovers_From_Failure) // more than once as well BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck) { - auto queue = std::unique_ptr(new Unique_Queue {QUEUE_BATCH_SIZE}); - boost::thread_group tg; - for (auto x = 0; x < nScriptCheckThreads; ++x) { - tg.create_thread([&]{queue->Thread();}); - - } - + auto queue = new Unique_Queue{QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS}; size_t COUNT = 100000; size_t total = COUNT; { - CCheckQueueControl control(queue.get()); + CCheckQueueControl control(queue); while (total) { size_t r = InsecureRandRange(10); std::vector vChecks; for (size_t k = 0; k < r && total; k++) vChecks.emplace_back(--total); - control.Add(vChecks); + control.Add(std::move(vChecks)); + } + } + { + WaitableLock lock(UniqueCheck::m); + bool r = true; + BOOST_REQUIRE_EQUAL(UniqueCheck::results.size(), COUNT); + for (size_t i = 0; i < COUNT; ++i) { + r = r && UniqueCheck::results.count(i) == 1; } + BOOST_REQUIRE(r); } - bool r = true; - BOOST_REQUIRE_EQUAL(UniqueCheck::results.size(), COUNT); - for (size_t i = 0; i < COUNT; ++i) - r = r && UniqueCheck::results.count(i) == 1; - BOOST_REQUIRE(r); - tg.interrupt_all(); - tg.join_all(); + delete queue; } @@ -311,15 +292,12 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck) // time could leave the data hanging across a sequence of blocks. BOOST_AUTO_TEST_CASE(test_CheckQueue_Memory) { - auto queue = std::unique_ptr(new Memory_Queue {QUEUE_BATCH_SIZE}); - boost::thread_group tg; - for (auto x = 0; x < nScriptCheckThreads; ++x) { - tg.create_thread([&]{queue->Thread();}); - } + auto queue = new Memory_Queue{QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS}; + for (size_t i = 0; i < 1000; ++i) { size_t total = i; { - CCheckQueueControl control(queue.get()); + CCheckQueueControl control(queue); while (total) { size_t r = InsecureRandRange(10); std::vector vChecks; @@ -329,34 +307,30 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Memory) // to catch any sort of deallocation failure vChecks.emplace_back(total == 0 || total == i || total == i/2); } - control.Add(vChecks); + control.Add(std::move(vChecks)); } } BOOST_REQUIRE_EQUAL(MemoryCheck::fake_allocated_memory, 0U); } - tg.interrupt_all(); - tg.join_all(); + delete queue; } // Test that a new verification cannot occur until all checks // have been destructed BOOST_AUTO_TEST_CASE(test_CheckQueue_FrozenCleanup) { - auto queue = std::unique_ptr(new FrozenCleanup_Queue {QUEUE_BATCH_SIZE}); - boost::thread_group tg; + auto queue = new FrozenCleanup_Queue{QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS}; bool fails = false; - for (auto x = 0; x < nScriptCheckThreads; ++x) { - tg.create_thread([&]{queue->Thread();}); - } std::thread t0([&]() { - CCheckQueueControl control(queue.get()); + CCheckQueueControl control(queue); std::vector vChecks(1); // Freezing can't be the default initialized behavior given how the queue // swaps in default initialized Checks (otherwise freezing destructor // would get called twice). vChecks[0].should_freeze = true; - control.Add(vChecks); - control.Wait(); // Hangs here + control.Add(std::move(vChecks)); + bool waitResult = control.Wait(); // Hangs here + assert(waitResult); }); { std::unique_lock l(FrozenCleanupCheck::m); @@ -376,35 +350,36 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_FrozenCleanup) FrozenCleanupCheck::cv.notify_one(); // Wait for control to finish t0.join(); - tg.interrupt_all(); - tg.join_all(); BOOST_REQUIRE(!fails); + delete queue; } /** Test that CCheckQueueControl is threadsafe */ BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks) { - auto queue = std::unique_ptr(new Standard_Queue{QUEUE_BATCH_SIZE}); + auto queue = new Standard_Queue{QUEUE_BATCH_SIZE, SCRIPT_CHECK_THREADS}; { - boost::thread_group tg; + std::vector tg; std::atomic nThreads {0}; std::atomic fails {0}; for (size_t i = 0; i < 3; ++i) { - tg.create_thread( + tg.emplace_back( [&]{ - CCheckQueueControl control(queue.get()); + CCheckQueueControl control(queue); // While sleeping, no other thread should execute to this point auto observed = ++nThreads; MilliSleep(10); fails += observed != nThreads; }); } - tg.join_all(); + for (auto& thread: tg) { + if (thread.joinable()) thread.join(); + } BOOST_REQUIRE_EQUAL(fails, 0); } { - boost::thread_group tg; + std::vector tg; std::mutex m; std::condition_variable cv; bool has_lock{false}; @@ -413,8 +388,8 @@ BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks) bool done_ack{false}; { std::unique_lock l(m); - tg.create_thread([&]{ - CCheckQueueControl control(queue.get()); + tg.emplace_back([&]{ + CCheckQueueControl control(queue); std::unique_lock ll(m); has_lock = true; cv.notify_one(); @@ -439,8 +414,11 @@ BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks) cv.notify_one(); BOOST_REQUIRE(!fails); } - tg.join_all(); + for (auto& thread: tg) { + if (thread.joinable()) thread.join(); + } } + delete queue; } BOOST_AUTO_TEST_SUITE_END() diff --git a/src/test/cuckoocache_tests.cpp b/src/test/cuckoocache_tests.cpp index 9fab6d1e7f..a59bd2d6a1 100644 --- a/src/test/cuckoocache_tests.cpp +++ b/src/test/cuckoocache_tests.cpp @@ -3,6 +3,8 @@ // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include +#include +#include #include #include