From 76877fe7459604f386f0cbde4a58377def47d3d8 Mon Sep 17 00:00:00 2001 From: Karl Koscher Date: Thu, 7 Apr 2022 19:45:44 -0700 Subject: [PATCH 01/28] Initial take on adding QuRT thread support to TVM's thread pool. WIP; crashes --- src/runtime/threading_backend.cc | 46 ++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/src/runtime/threading_backend.cc b/src/runtime/threading_backend.cc index 748b0b035094..4de75f276874 100644 --- a/src/runtime/threading_backend.cc +++ b/src/runtime/threading_backend.cc @@ -34,6 +34,8 @@ #endif #if defined(__hexagon__) #include +#include +#define HEXAGON_STACK_SIZE 65536 #endif #include #include @@ -41,6 +43,38 @@ namespace tvm { namespace runtime { namespace threading { +#if defined(__hexagon__) +class QuRTThread { + public: + //template + //explicit QuRTThread(Function&& f) { + QuRTThread(std::function worker_callback, int worker_id) : + f(worker_callback), i(worker_id) { + qurt_thread_attr_t attr; + qurt_thread_attr_init(&attr); + stack = malloc(HEXAGON_STACK_SIZE); + qurt_thread_attr_set_stack_size(&attr, HEXAGON_STACK_SIZE); + qurt_thread_attr_set_stack_addr(&attr, stack); + qurt_thread_create(&thread, &attr, (void (*)(void *))run_func, this); + } + ~QuRTThread() { + free(stack); + } + bool joinable() const { return qurt_thread_get_id() != thread; } + void join() { + int status; + qurt_thread_join(thread, &status); + } + private: + static void run_func(QuRTThread * t) { + t->f(t->i); + } + qurt_thread_t thread; + void * stack; + std::function f; + int i; +}; +#endif thread_local int max_concurrency = 0; class ThreadGroup::Impl { public: @@ -48,7 +82,11 @@ class ThreadGroup::Impl { : num_workers_(num_workers) { ICHECK_GE(num_workers, 1) << "Requested a non-positive number of worker threads."; for (int i = exclude_worker0; i < num_workers_; ++i) { +#ifdef __hexagon__ + threads_.emplace_back(QuRTThread(worker_callback, i)); +#else threads_.emplace_back([worker_callback, i] { worker_callback(i); }); +#endif // __hexagon__ } InitSortedOrder(); } @@ -116,6 +154,7 @@ class ThreadGroup::Impl { // if worker 0 is offloaded to main, i.e. exclude_worker0 is true, // the main thread is bound to core 0. void SetAffinity(bool exclude_worker0, AffinityMode mode) { +#ifndef __hexagon__ const char* val = getenv("TVM_BIND_THREADS"); if (val != nullptr && atoi(val) != 1) { return; @@ -172,6 +211,7 @@ class ThreadGroup::Impl { SetMasterThreadFullCpuAffinity(mode); } } + #endif // __hexagon__ } void SetThreadFullCpuAffinity(std::thread::native_handle_type thread, AffinityMode mode) { @@ -185,6 +225,7 @@ class ThreadGroup::Impl { // Note: this works well on x86 too. Because x86 doesn't have BIG.LITTLE, // our implementation will use kBig mode by default and will let main thread // run on intended cores. + #ifndef __hexagon__ std::vector ids; switch (mode) { case kSpecifyOneCorePerThread: @@ -206,6 +247,7 @@ class ThreadGroup::Impl { break; } SetThreadAffinity(thread, ids); + #endif // __hexagon__ } void SetMasterThreadFullCpuAffinity(AffinityMode mode) { @@ -259,7 +301,11 @@ class ThreadGroup::Impl { } int num_workers_; +#if defined(__hexagon__) + std::vector threads_; +#else std::vector threads_; +#endif std::vector sorted_order_; int big_count_ = 0; int little_count_ = 0; From 260563488946ae0033d275beecb30ffaadb88c75 Mon Sep 17 00:00:00 2001 From: Karl Koscher Date: Fri, 8 Apr 2022 13:35:49 -0700 Subject: [PATCH 02/28] Allocate QuRT thread stacks automatically --- src/runtime/threading_backend.cc | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/runtime/threading_backend.cc b/src/runtime/threading_backend.cc index 4de75f276874..803af6b40a9e 100644 --- a/src/runtime/threading_backend.cc +++ b/src/runtime/threading_backend.cc @@ -52,14 +52,10 @@ class QuRTThread { f(worker_callback), i(worker_id) { qurt_thread_attr_t attr; qurt_thread_attr_init(&attr); - stack = malloc(HEXAGON_STACK_SIZE); - qurt_thread_attr_set_stack_size(&attr, HEXAGON_STACK_SIZE); + qurt_thread_attr_set_stack_size(&attr, sizeof(stack)); qurt_thread_attr_set_stack_addr(&attr, stack); qurt_thread_create(&thread, &attr, (void (*)(void *))run_func, this); } - ~QuRTThread() { - free(stack); - } bool joinable() const { return qurt_thread_get_id() != thread; } void join() { int status; @@ -73,6 +69,7 @@ class QuRTThread { void * stack; std::function f; int i; + uint8_t stack[HEXAGON_STACK_SIZE]; }; #endif thread_local int max_concurrency = 0; From 82bad79a4eff6437bea296aae441f64d17762ae3 Mon Sep 17 00:00:00 2001 From: Karl Koscher Date: Fri, 8 Apr 2022 13:37:04 -0700 Subject: [PATCH 03/28] Remove duplicate stack in QuRTThread --- src/runtime/threading_backend.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/src/runtime/threading_backend.cc b/src/runtime/threading_backend.cc index 803af6b40a9e..54d7d5054082 100644 --- a/src/runtime/threading_backend.cc +++ b/src/runtime/threading_backend.cc @@ -66,7 +66,6 @@ class QuRTThread { t->f(t->i); } qurt_thread_t thread; - void * stack; std::function f; int i; uint8_t stack[HEXAGON_STACK_SIZE]; From cb07bd3ef7eccd605b8203f17a49e827934ba22f Mon Sep 17 00:00:00 2001 From: Karl Koscher Date: Fri, 8 Apr 2022 14:07:00 -0700 Subject: [PATCH 04/28] Add more logging to QuRTThread --- src/runtime/threading_backend.cc | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/runtime/threading_backend.cc b/src/runtime/threading_backend.cc index 54d7d5054082..e72f06162a28 100644 --- a/src/runtime/threading_backend.cc +++ b/src/runtime/threading_backend.cc @@ -34,6 +34,8 @@ #endif #if defined(__hexagon__) #include +#define FARF_LOW 1 +#include #include #define HEXAGON_STACK_SIZE 65536 #endif @@ -51,6 +53,7 @@ class QuRTThread { QuRTThread(std::function worker_callback, int worker_id) : f(worker_callback), i(worker_id) { qurt_thread_attr_t attr; + FARF(LOW, "Creating worker thread %d", i); qurt_thread_attr_init(&attr); qurt_thread_attr_set_stack_size(&attr, sizeof(stack)); qurt_thread_attr_set_stack_addr(&attr, stack); @@ -59,11 +62,14 @@ class QuRTThread { bool joinable() const { return qurt_thread_get_id() != thread; } void join() { int status; + FARF(LOW, "join() called on thread id %d (worker id %d)", thread, i); qurt_thread_join(thread, &status); } private: static void run_func(QuRTThread * t) { - t->f(t->i); + FARF(LOW, "In run_func"); + qurt_thread_exit(0); + //t->f(t->i); } qurt_thread_t thread; std::function f; From f360a9fa53c9360647f93cd8e1733970f6345f8f Mon Sep 17 00:00:00 2001 From: Karl Koscher Date: Fri, 8 Apr 2022 16:49:44 -0700 Subject: [PATCH 05/28] Use QuRT mutexes and condition variables --- src/runtime/thread_pool.cc | 45 ++++++++++++++++++++++++++++---- src/runtime/threading_backend.cc | 23 +++++++++++++--- 2 files changed, 59 insertions(+), 9 deletions(-) diff --git a/src/runtime/thread_pool.cc b/src/runtime/thread_pool.cc index ef1369c7496f..f7585b5bbef7 100644 --- a/src/runtime/thread_pool.cc +++ b/src/runtime/thread_pool.cc @@ -43,6 +43,41 @@ #include #include +#ifdef __hexagon__ +#define FARF_LOW 1 +#include +class Mutex { + public: + Mutex() { qurt_mutex_init(&mutex); } + ~Mutex() { qurt_mutex_destroy(&mutex); } + //operator=() = delete; + void lock() { qurt_mutex_lock(&mutex); } + bool try_lock() { return qurt_mutex_try_lock(&mutex) == 0; } + void unlock() { qurt_mutex_unlock(&mutex); } + qurt_mutex_t * mutex_ptr() { return &mutex; } + private: + qurt_mutex_t mutex; +}; +class ConditionVariable { + public: + ConditionVariable() { qurt_cond_init(&cond); } + ~ConditionVariable() { qurt_cond_destroy(&cond); } + void notify_all() { qurt_cond_broadcast(&cond); } + void notify_one() { qurt_cond_signal(&cond); } + template + void wait(std::unique_lock& lock, Predicate stop_waiting) { + while (!stop_waiting()) { + qurt_cond_wait(&cond, lock.mutex()->mutex_ptr()); + } + } + private: + qurt_cond_t cond; +}; +#else +typedef std::mutex Mutex; +typedef std::condition_variable ConditionVariable; +#endif + #include "../support/utils.h" const constexpr int kL1CacheBytes = 64; @@ -164,7 +199,7 @@ class SpscTaskQueue { tvm::runtime::threading::Yield(); } if (pending_.fetch_add(1) == -1) { - std::unique_lock lock(mutex_); + std::unique_lock lock(mutex_); cv_.notify_one(); } } @@ -183,7 +218,7 @@ class SpscTaskQueue { tvm::runtime::threading::Yield(); } if (pending_.fetch_sub(1) == 0) { - std::unique_lock lock(mutex_); + std::unique_lock lock(mutex_); cv_.wait(lock, [this] { return pending_.load() >= 0 || exit_now_.load(); }); } if (exit_now_.load(std::memory_order_relaxed)) { @@ -201,7 +236,7 @@ class SpscTaskQueue { * \brief Signal to terminate the worker. */ void SignalForKill() { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); exit_now_.store(true); cv_.notify_all(); } @@ -251,9 +286,9 @@ class SpscTaskQueue { std::atomic exit_now_{false}; // internal mutex - std::mutex mutex_; + Mutex mutex_; // cv for consumer - std::condition_variable cv_; + ConditionVariable cv_; }; // The thread pool diff --git a/src/runtime/threading_backend.cc b/src/runtime/threading_backend.cc index e72f06162a28..e4925be1b74f 100644 --- a/src/runtime/threading_backend.cc +++ b/src/runtime/threading_backend.cc @@ -59,7 +59,10 @@ class QuRTThread { qurt_thread_attr_set_stack_addr(&attr, stack); qurt_thread_create(&thread, &attr, (void (*)(void *))run_func, this); } - bool joinable() const { return qurt_thread_get_id() != thread; } + bool joinable() const { + FARF(LOW, "Checking if current thread %d != %d for joinability", qurt_thread_get_id(), thread); + return qurt_thread_get_id() != thread; + } void join() { int status; FARF(LOW, "join() called on thread id %d (worker id %d)", thread, i); @@ -68,8 +71,10 @@ class QuRTThread { private: static void run_func(QuRTThread * t) { FARF(LOW, "In run_func"); - qurt_thread_exit(0); - //t->f(t->i); + t->f(t->i); + FARF(LOW, "Leaving run_func"); + qurt_thread_exit(QURT_EOK); + FARF(LOW, "I SHOULD NOT BE HERE"); } qurt_thread_t thread; std::function f; @@ -135,6 +140,9 @@ class ThreadGroup::Impl { private: void SetThreadAffinity(std::thread::native_handle_type thread, const std::vector& ids) { +#ifdef __hexagon__ + FARF(LOW, "SetThreadAffinity called!!"); +#endif #if defined(__linux__) || defined(__ANDROID__) if (pthread_equal(thread, CURRENT_THREAD_HANDLE)) { thread = pthread_self(); @@ -324,7 +332,14 @@ int ThreadGroup::Configure(AffinityMode mode, int nthreads, bool exclude_worker0 return impl_->Configure(mode, nthreads, exclude_worker0, cpus); } -void Yield() { std::this_thread::yield(); } +void Yield() { +#ifdef __hexagon__ + qurt_sleep(1); +#else + std::this_thread::yield(); +#endif +} + /*! * \brief Set the maximum number of available cores. */ From ffeaa90862fcdb5f4d208225a5873845ee12ee8c Mon Sep 17 00:00:00 2001 From: Karl Koscher Date: Mon, 11 Apr 2022 17:41:58 -0700 Subject: [PATCH 06/28] Get QuRT thread pools working perhaps --- src/runtime/thread_pool.cc | 16 ++++++++++++++++ src/runtime/threading_backend.cc | 31 +++++++++++++++---------------- 2 files changed, 31 insertions(+), 16 deletions(-) diff --git a/src/runtime/thread_pool.cc b/src/runtime/thread_pool.cc index f7585b5bbef7..7424192ce252 100644 --- a/src/runtime/thread_pool.cc +++ b/src/runtime/thread_pool.cc @@ -46,6 +46,7 @@ #ifdef __hexagon__ #define FARF_LOW 1 #include +#define NARF(...) FARF(LOW, __VA_ARGS__) class Mutex { public: Mutex() { qurt_mutex_init(&mutex); } @@ -76,6 +77,7 @@ class ConditionVariable { #else typedef std::mutex Mutex; typedef std::condition_variable ConditionVariable; +#define NARF(...) #endif #include "../support/utils.h" @@ -332,23 +334,30 @@ class ThreadPool { << " workers=" << num_workers_used_ << " request=" << num_task; } launcher->Init(flambda, cdata, num_task, need_sync != 0); + NARF("launcher->Init returned"); SpscTaskQueue::Task tsk; tsk.launcher = launcher; // if worker0 is taken by the main, queues_[0] is abandoned for (int i = exclude_worker0_; i < num_task; ++i) { + NARF("queues_[%d]->Push(tsk)", i); tsk.task_id = i; queues_[i]->Push(tsk); } // use the main thread to run task 0 if (exclude_worker0_) { TVMParallelGroupEnv* penv = &(tsk.launcher->env); + NARF("Calling *tsk.launcher->flambda(0)"); if ((*tsk.launcher->flambda)(0, penv, cdata) == 0) { + NARF("Calling tsk.launcher->SignalJobFinish()"); tsk.launcher->SignalJobFinish(); } else { + NARF("Calling tsk.launcher->SignalJobError()"); tsk.launcher->SignalJobError(tsk.task_id); } } + NARF("Calling launcher->WaitForJobs"); int res = launcher->WaitForJobs(); + NARF("launcher->WaitForJobs returned %d", res); return res; } @@ -388,14 +397,20 @@ class ThreadPool { // Initialize the spin count (from envvar TVM_THREAD_POOL_SPIN_COUNT) on // the global first use of the ThreadPool. // TODO(tulloch): should we make this configurable via standard APIs? + NARF("In RunWorker(%d)", worker_id); static size_t spin_count = GetSpinCount(); + NARF("Trying to pop task off of worker queue %d", worker_id); while (queue->Pop(&task, spin_count)) { + NARF("Got a task for worker %d", worker_id); ICHECK(task.launcher != nullptr); TVMParallelGroupEnv* penv = &(task.launcher->env); void* cdata = task.launcher->cdata; + NARF("Executing flambda for worker %d", worker_id); if ((*task.launcher->flambda)(task.task_id, penv, cdata) == 0) { task.launcher->SignalJobFinish(); + NARF("Calling SignalJobFinish for worker %d", worker_id); } else { + NARF("Calling SignalJobError for worker %d", worker_id); task.launcher->SignalJobError(task.task_id); } } @@ -464,6 +479,7 @@ int TVMBackendParallelLaunch(FTVMParallelLambda flambda, void* cdata, int num_ta } else { #if !TVM_THREADPOOL_USE_OPENMP int res = tvm::runtime::ThreadPool::ThreadLocal()->Launch(flambda, cdata, num_task, 1); + NARF("tvm::runtime::ThreadPool::ThreadLocal()->Launch returned %d", res); return res; #else if (num_task == 0) num_task = num_workers; diff --git a/src/runtime/threading_backend.cc b/src/runtime/threading_backend.cc index e4925be1b74f..de1dd1db3328 100644 --- a/src/runtime/threading_backend.cc +++ b/src/runtime/threading_backend.cc @@ -47,17 +47,18 @@ namespace runtime { namespace threading { #if defined(__hexagon__) class QuRTThread { + typedef std::function Callback; public: //template //explicit QuRTThread(Function&& f) { - QuRTThread(std::function worker_callback, int worker_id) : - f(worker_callback), i(worker_id) { - qurt_thread_attr_t attr; - FARF(LOW, "Creating worker thread %d", i); + QuRTThread(Callback worker_callback) : + f(worker_callback) { + //FARF(LOW, "Creating worker thread %d", i); qurt_thread_attr_init(&attr); qurt_thread_attr_set_stack_size(&attr, sizeof(stack)); qurt_thread_attr_set_stack_addr(&attr, stack); qurt_thread_create(&thread, &attr, (void (*)(void *))run_func, this); + FARF(LOW, "created thread %d", thread); } bool joinable() const { FARF(LOW, "Checking if current thread %d != %d for joinability", qurt_thread_get_id(), thread); @@ -65,35 +66,33 @@ class QuRTThread { } void join() { int status; - FARF(LOW, "join() called on thread id %d (worker id %d)", thread, i); + FARF(LOW, "join() called on thread id %d", thread); qurt_thread_join(thread, &status); } private: static void run_func(QuRTThread * t) { - FARF(LOW, "In run_func"); - t->f(t->i); - FARF(LOW, "Leaving run_func"); + FARF(LOW, "In run_func for thread %d", qurt_thread_get_id()); + t->f(); + FARF(LOW, "Leaving run_func %d", qurt_thread_get_id()); qurt_thread_exit(QURT_EOK); - FARF(LOW, "I SHOULD NOT BE HERE"); } + qurt_thread_attr_t attr; qurt_thread_t thread; - std::function f; - int i; + Callback f; uint8_t stack[HEXAGON_STACK_SIZE]; }; #endif -thread_local int max_concurrency = 0; +thread_local int max_concurrency = 3; class ThreadGroup::Impl { public: Impl(int num_workers, std::function worker_callback, bool exclude_worker0) : num_workers_(num_workers) { +#ifdef __hexagon__ + FARF(LOW, "num_workers requested = %d, exclude_worker0 = %d", num_workers_, exclude_worker0); +#endif ICHECK_GE(num_workers, 1) << "Requested a non-positive number of worker threads."; for (int i = exclude_worker0; i < num_workers_; ++i) { -#ifdef __hexagon__ - threads_.emplace_back(QuRTThread(worker_callback, i)); -#else threads_.emplace_back([worker_callback, i] { worker_callback(i); }); -#endif // __hexagon__ } InitSortedOrder(); } From bc0ef77b553f459d801fa8c487f887b47f2108b6 Mon Sep 17 00:00:00 2001 From: Karl Koscher Date: Mon, 11 Apr 2022 17:58:24 -0700 Subject: [PATCH 07/28] Sleep for a little bit to let race condition bugs shine through --- src/runtime/threading_backend.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/runtime/threading_backend.cc b/src/runtime/threading_backend.cc index de1dd1db3328..444303635d7c 100644 --- a/src/runtime/threading_backend.cc +++ b/src/runtime/threading_backend.cc @@ -72,8 +72,9 @@ class QuRTThread { private: static void run_func(QuRTThread * t) { FARF(LOW, "In run_func for thread %d", qurt_thread_get_id()); + qurt_sleep(100000); t->f(); - FARF(LOW, "Leaving run_func %d", qurt_thread_get_id()); + FARF(LOW, "Leaving run_func for thread %d", qurt_thread_get_id()); qurt_thread_exit(QURT_EOK); } qurt_thread_attr_t attr; From e1fd5eee3e5b559143cacab1b4d912ecfe48bf8c Mon Sep 17 00:00:00 2001 From: Karl Koscher Date: Wed, 13 Apr 2022 13:01:12 -0700 Subject: [PATCH 08/28] ayeee it works! --- src/runtime/thread_pool.cc | 34 +++++------------------- src/runtime/threading_backend.cc | 45 +++++++++++++++++++++++++------- 2 files changed, 42 insertions(+), 37 deletions(-) diff --git a/src/runtime/thread_pool.cc b/src/runtime/thread_pool.cc index 7424192ce252..2ab78cdd337c 100644 --- a/src/runtime/thread_pool.cc +++ b/src/runtime/thread_pool.cc @@ -47,33 +47,6 @@ #define FARF_LOW 1 #include #define NARF(...) FARF(LOW, __VA_ARGS__) -class Mutex { - public: - Mutex() { qurt_mutex_init(&mutex); } - ~Mutex() { qurt_mutex_destroy(&mutex); } - //operator=() = delete; - void lock() { qurt_mutex_lock(&mutex); } - bool try_lock() { return qurt_mutex_try_lock(&mutex) == 0; } - void unlock() { qurt_mutex_unlock(&mutex); } - qurt_mutex_t * mutex_ptr() { return &mutex; } - private: - qurt_mutex_t mutex; -}; -class ConditionVariable { - public: - ConditionVariable() { qurt_cond_init(&cond); } - ~ConditionVariable() { qurt_cond_destroy(&cond); } - void notify_all() { qurt_cond_broadcast(&cond); } - void notify_one() { qurt_cond_signal(&cond); } - template - void wait(std::unique_lock& lock, Predicate stop_waiting) { - while (!stop_waiting()) { - qurt_cond_wait(&cond, lock.mutex()->mutex_ptr()); - } - } - private: - qurt_cond_t cond; -}; #else typedef std::mutex Mutex; typedef std::condition_variable ConditionVariable; @@ -216,13 +189,17 @@ class SpscTaskQueue { // Busy wait a bit when the queue is empty. // If a new task comes to the queue quickly, this wait avoid the worker from sleeping. // The default spin count is set by following the typical omp convention + NARF("thread %d: Pop() ... ", qurt_thread_get_id()); for (uint32_t i = 0; i < spin_count && pending_.load() == 0; ++i) { tvm::runtime::threading::Yield(); } + NARF("thread %d: done spinning in Pop()", qurt_thread_get_id()); if (pending_.fetch_sub(1) == 0) { + NARF("thread %d: Nothing available yet in Pop(), waiting...", qurt_thread_get_id()); std::unique_lock lock(mutex_); cv_.wait(lock, [this] { return pending_.load() >= 0 || exit_now_.load(); }); } + NARF("thread %d: Pop() got something", qurt_thread_get_id()); if (exit_now_.load(std::memory_order_relaxed)) { return false; } @@ -231,6 +208,7 @@ class SpscTaskQueue { ICHECK(tail_.load(std::memory_order_acquire) != head); *output = buffer_[head]; head_.store((head + 1) % kRingSize, std::memory_order_release); + NARF("thread %d: leaving Pop()", qurt_thread_get_id()); return true; } @@ -397,7 +375,7 @@ class ThreadPool { // Initialize the spin count (from envvar TVM_THREAD_POOL_SPIN_COUNT) on // the global first use of the ThreadPool. // TODO(tulloch): should we make this configurable via standard APIs? - NARF("In RunWorker(%d)", worker_id); + NARF("In RunWorker(%d), PL:TL() = %08x", worker_id, ParallelLauncher::ThreadLocal()); static size_t spin_count = GetSpinCount(); NARF("Trying to pop task off of worker queue %d", worker_id); while (queue->Pop(&task, spin_count)) { diff --git a/src/runtime/threading_backend.cc b/src/runtime/threading_backend.cc index 444303635d7c..72b81ddac078 100644 --- a/src/runtime/threading_backend.cc +++ b/src/runtime/threading_backend.cc @@ -37,7 +37,9 @@ #define FARF_LOW 1 #include #include +#include #define HEXAGON_STACK_SIZE 65536 +#define HEXAGON_STACK_ALIGNMENT 32 #endif #include #include @@ -53,12 +55,30 @@ class QuRTThread { //explicit QuRTThread(Function&& f) { QuRTThread(Callback worker_callback) : f(worker_callback) { - //FARF(LOW, "Creating worker thread %d", i); + static int id = 1; + qurt_thread_attr_t attr; + char name[32]; + FARF(LOW, "Creating worker thread %d", id); + int ret = posix_memalign(&stack, HEXAGON_STACK_ALIGNMENT, HEXAGON_STACK_SIZE); + FARF(LOW, "posix_memalign returned %d, stack = %08x", ret, stack); qurt_thread_attr_init(&attr); - qurt_thread_attr_set_stack_size(&attr, sizeof(stack)); + qurt_thread_attr_set_stack_size(&attr, HEXAGON_STACK_SIZE); qurt_thread_attr_set_stack_addr(&attr, stack); + snprintf(name, sizeof(name), "worker %d", id++); + qurt_thread_attr_set_name(&attr, name); qurt_thread_create(&thread, &attr, (void (*)(void *))run_func, this); - FARF(LOW, "created thread %d", thread); + FARF(LOW, "created thread %d (stack = %08x)", thread, stack); + } + QuRTThread(QuRTThread&& other) : + thread(other.thread), f(other.f), stack(other.stack) { + other.thread = 0; + } + ~QuRTThread() { + FARF(LOW, "~QuRTThread()"); + if (thread) { + join(); + free(stack); + } } bool joinable() const { FARF(LOW, "Checking if current thread %d != %d for joinability", qurt_thread_get_id(), thread); @@ -66,24 +86,23 @@ class QuRTThread { } void join() { int status; - FARF(LOW, "join() called on thread id %d", thread); + FARF(LOW, "join() called on thread id %d from thread %d", thread, qurt_thread_get_id()); qurt_thread_join(thread, &status); } private: static void run_func(QuRTThread * t) { FARF(LOW, "In run_func for thread %d", qurt_thread_get_id()); - qurt_sleep(100000); t->f(); + qurt_sleep(100000); FARF(LOW, "Leaving run_func for thread %d", qurt_thread_get_id()); qurt_thread_exit(QURT_EOK); } - qurt_thread_attr_t attr; qurt_thread_t thread; Callback f; - uint8_t stack[HEXAGON_STACK_SIZE]; + void *stack; }; #endif -thread_local int max_concurrency = 3; +thread_local int max_concurrency = 0; class ThreadGroup::Impl { public: Impl(int num_workers, std::function worker_callback, bool exclude_worker0) @@ -97,7 +116,15 @@ class ThreadGroup::Impl { } InitSortedOrder(); } - ~Impl() { Join(); } + ~Impl() { +#ifdef __hexagon__ + FARF(LOW, "Calling Join() from ~Impl()"); +#endif + Join(); +#ifdef __hexagon__ + FARF(LOW, "Join() returned in ~Impl()"); +#endif + } void Join() { for (auto& t : threads_) { From 5948675559e0b82ec94dbc8118e3d90ea819c602 Mon Sep 17 00:00:00 2001 From: Karl Koscher Date: Wed, 13 Apr 2022 13:34:33 -0700 Subject: [PATCH 09/28] Remove custom hexagon implementations of std::mutex and std::condition_variable --- src/runtime/thread_pool.cc | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/runtime/thread_pool.cc b/src/runtime/thread_pool.cc index 2ab78cdd337c..1323c39e2b89 100644 --- a/src/runtime/thread_pool.cc +++ b/src/runtime/thread_pool.cc @@ -48,8 +48,6 @@ #include #define NARF(...) FARF(LOW, __VA_ARGS__) #else -typedef std::mutex Mutex; -typedef std::condition_variable ConditionVariable; #define NARF(...) #endif @@ -174,7 +172,7 @@ class SpscTaskQueue { tvm::runtime::threading::Yield(); } if (pending_.fetch_add(1) == -1) { - std::unique_lock lock(mutex_); + std::unique_lock lock(mutex_); cv_.notify_one(); } } @@ -196,7 +194,7 @@ class SpscTaskQueue { NARF("thread %d: done spinning in Pop()", qurt_thread_get_id()); if (pending_.fetch_sub(1) == 0) { NARF("thread %d: Nothing available yet in Pop(), waiting...", qurt_thread_get_id()); - std::unique_lock lock(mutex_); + std::unique_lock lock(mutex_); cv_.wait(lock, [this] { return pending_.load() >= 0 || exit_now_.load(); }); } NARF("thread %d: Pop() got something", qurt_thread_get_id()); @@ -216,7 +214,7 @@ class SpscTaskQueue { * \brief Signal to terminate the worker. */ void SignalForKill() { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); exit_now_.store(true); cv_.notify_all(); } @@ -266,9 +264,9 @@ class SpscTaskQueue { std::atomic exit_now_{false}; // internal mutex - Mutex mutex_; + std::mutex mutex_; // cv for consumer - ConditionVariable cv_; + std::condition_variable cv_; }; // The thread pool From deebe253c66eca71609480b912eae437b9328203 Mon Sep 17 00:00:00 2001 From: Karl Koscher Date: Wed, 13 Apr 2022 13:54:39 -0700 Subject: [PATCH 10/28] threading_backend.cc code cleanup --- src/runtime/threading_backend.cc | 43 ++++++-------------------------- 1 file changed, 8 insertions(+), 35 deletions(-) diff --git a/src/runtime/threading_backend.cc b/src/runtime/threading_backend.cc index 72b81ddac078..b77a5a25a977 100644 --- a/src/runtime/threading_backend.cc +++ b/src/runtime/threading_backend.cc @@ -34,8 +34,6 @@ #endif #if defined(__hexagon__) #include -#define FARF_LOW 1 -#include #include #include #define HEXAGON_STACK_SIZE 65536 @@ -47,83 +45,61 @@ namespace tvm { namespace runtime { namespace threading { -#if defined(__hexagon__) +#ifdef __hexagon__ class QuRTThread { typedef std::function Callback; public: - //template - //explicit QuRTThread(Function&& f) { QuRTThread(Callback worker_callback) : f(worker_callback) { static int id = 1; qurt_thread_attr_t attr; char name[32]; - FARF(LOW, "Creating worker thread %d", id); - int ret = posix_memalign(&stack, HEXAGON_STACK_ALIGNMENT, HEXAGON_STACK_SIZE); - FARF(LOW, "posix_memalign returned %d, stack = %08x", ret, stack); + posix_memalign(&stack, HEXAGON_STACK_ALIGNMENT, HEXAGON_STACK_SIZE); qurt_thread_attr_init(&attr); qurt_thread_attr_set_stack_size(&attr, HEXAGON_STACK_SIZE); qurt_thread_attr_set_stack_addr(&attr, stack); snprintf(name, sizeof(name), "worker %d", id++); qurt_thread_attr_set_name(&attr, name); qurt_thread_create(&thread, &attr, (void (*)(void *))run_func, this); - FARF(LOW, "created thread %d (stack = %08x)", thread, stack); } QuRTThread(QuRTThread&& other) : thread(other.thread), f(other.f), stack(other.stack) { other.thread = 0; } ~QuRTThread() { - FARF(LOW, "~QuRTThread()"); if (thread) { join(); free(stack); } } - bool joinable() const { - FARF(LOW, "Checking if current thread %d != %d for joinability", qurt_thread_get_id(), thread); - return qurt_thread_get_id() != thread; - } + bool joinable() const { return qurt_thread_get_id() != thread; } void join() { int status; - FARF(LOW, "join() called on thread id %d from thread %d", thread, qurt_thread_get_id()); qurt_thread_join(thread, &status); } private: static void run_func(QuRTThread * t) { - FARF(LOW, "In run_func for thread %d", qurt_thread_get_id()); t->f(); - qurt_sleep(100000); - FARF(LOW, "Leaving run_func for thread %d", qurt_thread_get_id()); qurt_thread_exit(QURT_EOK); } qurt_thread_t thread; Callback f; void *stack; }; -#endif +#endif // __hexagon__ thread_local int max_concurrency = 0; class ThreadGroup::Impl { public: Impl(int num_workers, std::function worker_callback, bool exclude_worker0) : num_workers_(num_workers) { -#ifdef __hexagon__ - FARF(LOW, "num_workers requested = %d, exclude_worker0 = %d", num_workers_, exclude_worker0); -#endif ICHECK_GE(num_workers, 1) << "Requested a non-positive number of worker threads."; for (int i = exclude_worker0; i < num_workers_; ++i) { threads_.emplace_back([worker_callback, i] { worker_callback(i); }); } InitSortedOrder(); } - ~Impl() { -#ifdef __hexagon__ - FARF(LOW, "Calling Join() from ~Impl()"); -#endif + ~Impl() { Join(); -#ifdef __hexagon__ - FARF(LOW, "Join() returned in ~Impl()"); -#endif } void Join() { @@ -167,9 +143,6 @@ class ThreadGroup::Impl { private: void SetThreadAffinity(std::thread::native_handle_type thread, const std::vector& ids) { -#ifdef __hexagon__ - FARF(LOW, "SetThreadAffinity called!!"); -#endif #if defined(__linux__) || defined(__ANDROID__) if (pthread_equal(thread, CURRENT_THREAD_HANDLE)) { thread = pthread_self(); @@ -248,7 +221,7 @@ class ThreadGroup::Impl { SetMasterThreadFullCpuAffinity(mode); } } - #endif // __hexagon__ +#endif // __hexagon__ } void SetThreadFullCpuAffinity(std::thread::native_handle_type thread, AffinityMode mode) { @@ -262,7 +235,7 @@ class ThreadGroup::Impl { // Note: this works well on x86 too. Because x86 doesn't have BIG.LITTLE, // our implementation will use kBig mode by default and will let main thread // run on intended cores. - #ifndef __hexagon__ +#ifndef __hexagon__ std::vector ids; switch (mode) { case kSpecifyOneCorePerThread: @@ -284,7 +257,7 @@ class ThreadGroup::Impl { break; } SetThreadAffinity(thread, ids); - #endif // __hexagon__ +#endif // __hexagon__ } void SetMasterThreadFullCpuAffinity(AffinityMode mode) { From b1e9265febe71acc45698539b9592e789c428d8b Mon Sep 17 00:00:00 2001 From: Karl Koscher Date: Wed, 13 Apr 2022 14:35:23 -0700 Subject: [PATCH 11/28] Formatting changes --- src/runtime/threading_backend.cc | 84 ++++++++++++++++---------------- 1 file changed, 42 insertions(+), 42 deletions(-) diff --git a/src/runtime/threading_backend.cc b/src/runtime/threading_backend.cc index b77a5a25a977..819ac82051e5 100644 --- a/src/runtime/threading_backend.cc +++ b/src/runtime/threading_backend.cc @@ -46,47 +46,49 @@ namespace tvm { namespace runtime { namespace threading { #ifdef __hexagon__ +// pthreads are broken on older versions of qurt, so +// we need to use native APIs instead of std::threads class QuRTThread { typedef std::function Callback; - public: - QuRTThread(Callback worker_callback) : - f(worker_callback) { - static int id = 1; - qurt_thread_attr_t attr; - char name[32]; - posix_memalign(&stack, HEXAGON_STACK_ALIGNMENT, HEXAGON_STACK_SIZE); - qurt_thread_attr_init(&attr); - qurt_thread_attr_set_stack_size(&attr, HEXAGON_STACK_SIZE); - qurt_thread_attr_set_stack_addr(&attr, stack); - snprintf(name, sizeof(name), "worker %d", id++); - qurt_thread_attr_set_name(&attr, name); - qurt_thread_create(&thread, &attr, (void (*)(void *))run_func, this); - } - QuRTThread(QuRTThread&& other) : - thread(other.thread), f(other.f), stack(other.stack) { - other.thread = 0; - } - ~QuRTThread() { - if (thread) { - join(); - free(stack); - } - } - bool joinable() const { return qurt_thread_get_id() != thread; } - void join() { - int status; - qurt_thread_join(thread, &status); - } - private: - static void run_func(QuRTThread * t) { - t->f(); - qurt_thread_exit(QURT_EOK); + + public: + explicit QuRTThread(Callback worker_callback) : f(worker_callback) { + static int id = 1; + qurt_thread_attr_t attr; + char name[32]; + posix_memalign(&stack, HEXAGON_STACK_ALIGNMENT, HEXAGON_STACK_SIZE); + qurt_thread_attr_init(&attr); + qurt_thread_attr_set_stack_size(&attr, HEXAGON_STACK_SIZE); + qurt_thread_attr_set_stack_addr(&attr, stack); + snprintf(name, sizeof(name), "worker %d", id++); + qurt_thread_attr_set_name(&attr, name); + qurt_thread_create(&thread, &attr, (void (*)(void*))run_func, this); + } + QuRTThread(QuRTThread&& other) : thread(other.thread), f(other.f), stack(other.stack) { + other.thread = 0; + } + ~QuRTThread() { + if (thread) { + join(); + free(stack); } - qurt_thread_t thread; - Callback f; - void *stack; + } + bool joinable() const { return qurt_thread_get_id() != thread; } + void join() { + int status; + qurt_thread_join(thread, &status); + } + + private: + static void run_func(QuRTThread* t) { + t->f(); + qurt_thread_exit(QURT_EOK); + } + qurt_thread_t thread; + Callback f; + void* stack; }; -#endif // __hexagon__ +#endif // __hexagon__ thread_local int max_concurrency = 0; class ThreadGroup::Impl { public: @@ -98,9 +100,7 @@ class ThreadGroup::Impl { } InitSortedOrder(); } - ~Impl() { - Join(); - } + ~Impl() { Join(); } void Join() { for (auto& t : threads_) { @@ -221,7 +221,7 @@ class ThreadGroup::Impl { SetMasterThreadFullCpuAffinity(mode); } } -#endif // __hexagon__ +#endif // __hexagon__ } void SetThreadFullCpuAffinity(std::thread::native_handle_type thread, AffinityMode mode) { @@ -257,7 +257,7 @@ class ThreadGroup::Impl { break; } SetThreadAffinity(thread, ids); -#endif // __hexagon__ +#endif // __hexagon__ } void SetMasterThreadFullCpuAffinity(AffinityMode mode) { From d27746b8ba8687587edbc1361a26f13d6af99578 Mon Sep 17 00:00:00 2001 From: Karl Koscher Date: Wed, 13 Apr 2022 14:39:37 -0700 Subject: [PATCH 12/28] remove hexagon debugging --- src/runtime/thread_pool.cc | 27 --------------------------- 1 file changed, 27 deletions(-) diff --git a/src/runtime/thread_pool.cc b/src/runtime/thread_pool.cc index 1323c39e2b89..ef1369c7496f 100644 --- a/src/runtime/thread_pool.cc +++ b/src/runtime/thread_pool.cc @@ -43,14 +43,6 @@ #include #include -#ifdef __hexagon__ -#define FARF_LOW 1 -#include -#define NARF(...) FARF(LOW, __VA_ARGS__) -#else -#define NARF(...) -#endif - #include "../support/utils.h" const constexpr int kL1CacheBytes = 64; @@ -187,17 +179,13 @@ class SpscTaskQueue { // Busy wait a bit when the queue is empty. // If a new task comes to the queue quickly, this wait avoid the worker from sleeping. // The default spin count is set by following the typical omp convention - NARF("thread %d: Pop() ... ", qurt_thread_get_id()); for (uint32_t i = 0; i < spin_count && pending_.load() == 0; ++i) { tvm::runtime::threading::Yield(); } - NARF("thread %d: done spinning in Pop()", qurt_thread_get_id()); if (pending_.fetch_sub(1) == 0) { - NARF("thread %d: Nothing available yet in Pop(), waiting...", qurt_thread_get_id()); std::unique_lock lock(mutex_); cv_.wait(lock, [this] { return pending_.load() >= 0 || exit_now_.load(); }); } - NARF("thread %d: Pop() got something", qurt_thread_get_id()); if (exit_now_.load(std::memory_order_relaxed)) { return false; } @@ -206,7 +194,6 @@ class SpscTaskQueue { ICHECK(tail_.load(std::memory_order_acquire) != head); *output = buffer_[head]; head_.store((head + 1) % kRingSize, std::memory_order_release); - NARF("thread %d: leaving Pop()", qurt_thread_get_id()); return true; } @@ -310,30 +297,23 @@ class ThreadPool { << " workers=" << num_workers_used_ << " request=" << num_task; } launcher->Init(flambda, cdata, num_task, need_sync != 0); - NARF("launcher->Init returned"); SpscTaskQueue::Task tsk; tsk.launcher = launcher; // if worker0 is taken by the main, queues_[0] is abandoned for (int i = exclude_worker0_; i < num_task; ++i) { - NARF("queues_[%d]->Push(tsk)", i); tsk.task_id = i; queues_[i]->Push(tsk); } // use the main thread to run task 0 if (exclude_worker0_) { TVMParallelGroupEnv* penv = &(tsk.launcher->env); - NARF("Calling *tsk.launcher->flambda(0)"); if ((*tsk.launcher->flambda)(0, penv, cdata) == 0) { - NARF("Calling tsk.launcher->SignalJobFinish()"); tsk.launcher->SignalJobFinish(); } else { - NARF("Calling tsk.launcher->SignalJobError()"); tsk.launcher->SignalJobError(tsk.task_id); } } - NARF("Calling launcher->WaitForJobs"); int res = launcher->WaitForJobs(); - NARF("launcher->WaitForJobs returned %d", res); return res; } @@ -373,20 +353,14 @@ class ThreadPool { // Initialize the spin count (from envvar TVM_THREAD_POOL_SPIN_COUNT) on // the global first use of the ThreadPool. // TODO(tulloch): should we make this configurable via standard APIs? - NARF("In RunWorker(%d), PL:TL() = %08x", worker_id, ParallelLauncher::ThreadLocal()); static size_t spin_count = GetSpinCount(); - NARF("Trying to pop task off of worker queue %d", worker_id); while (queue->Pop(&task, spin_count)) { - NARF("Got a task for worker %d", worker_id); ICHECK(task.launcher != nullptr); TVMParallelGroupEnv* penv = &(task.launcher->env); void* cdata = task.launcher->cdata; - NARF("Executing flambda for worker %d", worker_id); if ((*task.launcher->flambda)(task.task_id, penv, cdata) == 0) { task.launcher->SignalJobFinish(); - NARF("Calling SignalJobFinish for worker %d", worker_id); } else { - NARF("Calling SignalJobError for worker %d", worker_id); task.launcher->SignalJobError(task.task_id); } } @@ -455,7 +429,6 @@ int TVMBackendParallelLaunch(FTVMParallelLambda flambda, void* cdata, int num_ta } else { #if !TVM_THREADPOOL_USE_OPENMP int res = tvm::runtime::ThreadPool::ThreadLocal()->Launch(flambda, cdata, num_task, 1); - NARF("tvm::runtime::ThreadPool::ThreadLocal()->Launch returned %d", res); return res; #else if (num_task == 0) num_task = num_workers; From c9ce982f02faf0e65f25dc771c8c4aa261ee6f48 Mon Sep 17 00:00:00 2001 From: Karl Koscher Date: Thu, 7 Apr 2022 19:45:44 -0700 Subject: [PATCH 13/28] Initial take on adding QuRT thread support to TVM's thread pool. WIP; crashes --- src/runtime/threading_backend.cc | 46 ++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/src/runtime/threading_backend.cc b/src/runtime/threading_backend.cc index 748b0b035094..4de75f276874 100644 --- a/src/runtime/threading_backend.cc +++ b/src/runtime/threading_backend.cc @@ -34,6 +34,8 @@ #endif #if defined(__hexagon__) #include +#include +#define HEXAGON_STACK_SIZE 65536 #endif #include #include @@ -41,6 +43,38 @@ namespace tvm { namespace runtime { namespace threading { +#if defined(__hexagon__) +class QuRTThread { + public: + //template + //explicit QuRTThread(Function&& f) { + QuRTThread(std::function worker_callback, int worker_id) : + f(worker_callback), i(worker_id) { + qurt_thread_attr_t attr; + qurt_thread_attr_init(&attr); + stack = malloc(HEXAGON_STACK_SIZE); + qurt_thread_attr_set_stack_size(&attr, HEXAGON_STACK_SIZE); + qurt_thread_attr_set_stack_addr(&attr, stack); + qurt_thread_create(&thread, &attr, (void (*)(void *))run_func, this); + } + ~QuRTThread() { + free(stack); + } + bool joinable() const { return qurt_thread_get_id() != thread; } + void join() { + int status; + qurt_thread_join(thread, &status); + } + private: + static void run_func(QuRTThread * t) { + t->f(t->i); + } + qurt_thread_t thread; + void * stack; + std::function f; + int i; +}; +#endif thread_local int max_concurrency = 0; class ThreadGroup::Impl { public: @@ -48,7 +82,11 @@ class ThreadGroup::Impl { : num_workers_(num_workers) { ICHECK_GE(num_workers, 1) << "Requested a non-positive number of worker threads."; for (int i = exclude_worker0; i < num_workers_; ++i) { +#ifdef __hexagon__ + threads_.emplace_back(QuRTThread(worker_callback, i)); +#else threads_.emplace_back([worker_callback, i] { worker_callback(i); }); +#endif // __hexagon__ } InitSortedOrder(); } @@ -116,6 +154,7 @@ class ThreadGroup::Impl { // if worker 0 is offloaded to main, i.e. exclude_worker0 is true, // the main thread is bound to core 0. void SetAffinity(bool exclude_worker0, AffinityMode mode) { +#ifndef __hexagon__ const char* val = getenv("TVM_BIND_THREADS"); if (val != nullptr && atoi(val) != 1) { return; @@ -172,6 +211,7 @@ class ThreadGroup::Impl { SetMasterThreadFullCpuAffinity(mode); } } + #endif // __hexagon__ } void SetThreadFullCpuAffinity(std::thread::native_handle_type thread, AffinityMode mode) { @@ -185,6 +225,7 @@ class ThreadGroup::Impl { // Note: this works well on x86 too. Because x86 doesn't have BIG.LITTLE, // our implementation will use kBig mode by default and will let main thread // run on intended cores. + #ifndef __hexagon__ std::vector ids; switch (mode) { case kSpecifyOneCorePerThread: @@ -206,6 +247,7 @@ class ThreadGroup::Impl { break; } SetThreadAffinity(thread, ids); + #endif // __hexagon__ } void SetMasterThreadFullCpuAffinity(AffinityMode mode) { @@ -259,7 +301,11 @@ class ThreadGroup::Impl { } int num_workers_; +#if defined(__hexagon__) + std::vector threads_; +#else std::vector threads_; +#endif std::vector sorted_order_; int big_count_ = 0; int little_count_ = 0; From 39097ab8f93eb882c45581159747d20fd1f20c60 Mon Sep 17 00:00:00 2001 From: Karl Koscher Date: Fri, 8 Apr 2022 13:35:49 -0700 Subject: [PATCH 14/28] Allocate QuRT thread stacks automatically --- src/runtime/threading_backend.cc | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/runtime/threading_backend.cc b/src/runtime/threading_backend.cc index 4de75f276874..803af6b40a9e 100644 --- a/src/runtime/threading_backend.cc +++ b/src/runtime/threading_backend.cc @@ -52,14 +52,10 @@ class QuRTThread { f(worker_callback), i(worker_id) { qurt_thread_attr_t attr; qurt_thread_attr_init(&attr); - stack = malloc(HEXAGON_STACK_SIZE); - qurt_thread_attr_set_stack_size(&attr, HEXAGON_STACK_SIZE); + qurt_thread_attr_set_stack_size(&attr, sizeof(stack)); qurt_thread_attr_set_stack_addr(&attr, stack); qurt_thread_create(&thread, &attr, (void (*)(void *))run_func, this); } - ~QuRTThread() { - free(stack); - } bool joinable() const { return qurt_thread_get_id() != thread; } void join() { int status; @@ -73,6 +69,7 @@ class QuRTThread { void * stack; std::function f; int i; + uint8_t stack[HEXAGON_STACK_SIZE]; }; #endif thread_local int max_concurrency = 0; From 832c1255b61a535a723e7e2218dd26a35864574a Mon Sep 17 00:00:00 2001 From: Karl Koscher Date: Fri, 8 Apr 2022 13:37:04 -0700 Subject: [PATCH 15/28] Remove duplicate stack in QuRTThread --- src/runtime/threading_backend.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/src/runtime/threading_backend.cc b/src/runtime/threading_backend.cc index 803af6b40a9e..54d7d5054082 100644 --- a/src/runtime/threading_backend.cc +++ b/src/runtime/threading_backend.cc @@ -66,7 +66,6 @@ class QuRTThread { t->f(t->i); } qurt_thread_t thread; - void * stack; std::function f; int i; uint8_t stack[HEXAGON_STACK_SIZE]; From a80713f7b6a63375f14a8e5fce641367f7a0444a Mon Sep 17 00:00:00 2001 From: Karl Koscher Date: Fri, 8 Apr 2022 14:07:00 -0700 Subject: [PATCH 16/28] Add more logging to QuRTThread --- src/runtime/threading_backend.cc | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/runtime/threading_backend.cc b/src/runtime/threading_backend.cc index 54d7d5054082..e72f06162a28 100644 --- a/src/runtime/threading_backend.cc +++ b/src/runtime/threading_backend.cc @@ -34,6 +34,8 @@ #endif #if defined(__hexagon__) #include +#define FARF_LOW 1 +#include #include #define HEXAGON_STACK_SIZE 65536 #endif @@ -51,6 +53,7 @@ class QuRTThread { QuRTThread(std::function worker_callback, int worker_id) : f(worker_callback), i(worker_id) { qurt_thread_attr_t attr; + FARF(LOW, "Creating worker thread %d", i); qurt_thread_attr_init(&attr); qurt_thread_attr_set_stack_size(&attr, sizeof(stack)); qurt_thread_attr_set_stack_addr(&attr, stack); @@ -59,11 +62,14 @@ class QuRTThread { bool joinable() const { return qurt_thread_get_id() != thread; } void join() { int status; + FARF(LOW, "join() called on thread id %d (worker id %d)", thread, i); qurt_thread_join(thread, &status); } private: static void run_func(QuRTThread * t) { - t->f(t->i); + FARF(LOW, "In run_func"); + qurt_thread_exit(0); + //t->f(t->i); } qurt_thread_t thread; std::function f; From f99d32b7d5e5f910ef97edfc663cdf00c4b2d067 Mon Sep 17 00:00:00 2001 From: Karl Koscher Date: Fri, 8 Apr 2022 16:49:44 -0700 Subject: [PATCH 17/28] Use QuRT mutexes and condition variables --- src/runtime/thread_pool.cc | 45 ++++++++++++++++++++++++++++---- src/runtime/threading_backend.cc | 23 +++++++++++++--- 2 files changed, 59 insertions(+), 9 deletions(-) diff --git a/src/runtime/thread_pool.cc b/src/runtime/thread_pool.cc index ef1369c7496f..f7585b5bbef7 100644 --- a/src/runtime/thread_pool.cc +++ b/src/runtime/thread_pool.cc @@ -43,6 +43,41 @@ #include #include +#ifdef __hexagon__ +#define FARF_LOW 1 +#include +class Mutex { + public: + Mutex() { qurt_mutex_init(&mutex); } + ~Mutex() { qurt_mutex_destroy(&mutex); } + //operator=() = delete; + void lock() { qurt_mutex_lock(&mutex); } + bool try_lock() { return qurt_mutex_try_lock(&mutex) == 0; } + void unlock() { qurt_mutex_unlock(&mutex); } + qurt_mutex_t * mutex_ptr() { return &mutex; } + private: + qurt_mutex_t mutex; +}; +class ConditionVariable { + public: + ConditionVariable() { qurt_cond_init(&cond); } + ~ConditionVariable() { qurt_cond_destroy(&cond); } + void notify_all() { qurt_cond_broadcast(&cond); } + void notify_one() { qurt_cond_signal(&cond); } + template + void wait(std::unique_lock& lock, Predicate stop_waiting) { + while (!stop_waiting()) { + qurt_cond_wait(&cond, lock.mutex()->mutex_ptr()); + } + } + private: + qurt_cond_t cond; +}; +#else +typedef std::mutex Mutex; +typedef std::condition_variable ConditionVariable; +#endif + #include "../support/utils.h" const constexpr int kL1CacheBytes = 64; @@ -164,7 +199,7 @@ class SpscTaskQueue { tvm::runtime::threading::Yield(); } if (pending_.fetch_add(1) == -1) { - std::unique_lock lock(mutex_); + std::unique_lock lock(mutex_); cv_.notify_one(); } } @@ -183,7 +218,7 @@ class SpscTaskQueue { tvm::runtime::threading::Yield(); } if (pending_.fetch_sub(1) == 0) { - std::unique_lock lock(mutex_); + std::unique_lock lock(mutex_); cv_.wait(lock, [this] { return pending_.load() >= 0 || exit_now_.load(); }); } if (exit_now_.load(std::memory_order_relaxed)) { @@ -201,7 +236,7 @@ class SpscTaskQueue { * \brief Signal to terminate the worker. */ void SignalForKill() { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); exit_now_.store(true); cv_.notify_all(); } @@ -251,9 +286,9 @@ class SpscTaskQueue { std::atomic exit_now_{false}; // internal mutex - std::mutex mutex_; + Mutex mutex_; // cv for consumer - std::condition_variable cv_; + ConditionVariable cv_; }; // The thread pool diff --git a/src/runtime/threading_backend.cc b/src/runtime/threading_backend.cc index e72f06162a28..e4925be1b74f 100644 --- a/src/runtime/threading_backend.cc +++ b/src/runtime/threading_backend.cc @@ -59,7 +59,10 @@ class QuRTThread { qurt_thread_attr_set_stack_addr(&attr, stack); qurt_thread_create(&thread, &attr, (void (*)(void *))run_func, this); } - bool joinable() const { return qurt_thread_get_id() != thread; } + bool joinable() const { + FARF(LOW, "Checking if current thread %d != %d for joinability", qurt_thread_get_id(), thread); + return qurt_thread_get_id() != thread; + } void join() { int status; FARF(LOW, "join() called on thread id %d (worker id %d)", thread, i); @@ -68,8 +71,10 @@ class QuRTThread { private: static void run_func(QuRTThread * t) { FARF(LOW, "In run_func"); - qurt_thread_exit(0); - //t->f(t->i); + t->f(t->i); + FARF(LOW, "Leaving run_func"); + qurt_thread_exit(QURT_EOK); + FARF(LOW, "I SHOULD NOT BE HERE"); } qurt_thread_t thread; std::function f; @@ -135,6 +140,9 @@ class ThreadGroup::Impl { private: void SetThreadAffinity(std::thread::native_handle_type thread, const std::vector& ids) { +#ifdef __hexagon__ + FARF(LOW, "SetThreadAffinity called!!"); +#endif #if defined(__linux__) || defined(__ANDROID__) if (pthread_equal(thread, CURRENT_THREAD_HANDLE)) { thread = pthread_self(); @@ -324,7 +332,14 @@ int ThreadGroup::Configure(AffinityMode mode, int nthreads, bool exclude_worker0 return impl_->Configure(mode, nthreads, exclude_worker0, cpus); } -void Yield() { std::this_thread::yield(); } +void Yield() { +#ifdef __hexagon__ + qurt_sleep(1); +#else + std::this_thread::yield(); +#endif +} + /*! * \brief Set the maximum number of available cores. */ From 85bc9b44033f4e5cfd19417356d807eecc4d5dac Mon Sep 17 00:00:00 2001 From: Karl Koscher Date: Mon, 11 Apr 2022 17:41:58 -0700 Subject: [PATCH 18/28] Get QuRT thread pools working perhaps --- src/runtime/thread_pool.cc | 16 ++++++++++++++++ src/runtime/threading_backend.cc | 31 +++++++++++++++---------------- 2 files changed, 31 insertions(+), 16 deletions(-) diff --git a/src/runtime/thread_pool.cc b/src/runtime/thread_pool.cc index f7585b5bbef7..7424192ce252 100644 --- a/src/runtime/thread_pool.cc +++ b/src/runtime/thread_pool.cc @@ -46,6 +46,7 @@ #ifdef __hexagon__ #define FARF_LOW 1 #include +#define NARF(...) FARF(LOW, __VA_ARGS__) class Mutex { public: Mutex() { qurt_mutex_init(&mutex); } @@ -76,6 +77,7 @@ class ConditionVariable { #else typedef std::mutex Mutex; typedef std::condition_variable ConditionVariable; +#define NARF(...) #endif #include "../support/utils.h" @@ -332,23 +334,30 @@ class ThreadPool { << " workers=" << num_workers_used_ << " request=" << num_task; } launcher->Init(flambda, cdata, num_task, need_sync != 0); + NARF("launcher->Init returned"); SpscTaskQueue::Task tsk; tsk.launcher = launcher; // if worker0 is taken by the main, queues_[0] is abandoned for (int i = exclude_worker0_; i < num_task; ++i) { + NARF("queues_[%d]->Push(tsk)", i); tsk.task_id = i; queues_[i]->Push(tsk); } // use the main thread to run task 0 if (exclude_worker0_) { TVMParallelGroupEnv* penv = &(tsk.launcher->env); + NARF("Calling *tsk.launcher->flambda(0)"); if ((*tsk.launcher->flambda)(0, penv, cdata) == 0) { + NARF("Calling tsk.launcher->SignalJobFinish()"); tsk.launcher->SignalJobFinish(); } else { + NARF("Calling tsk.launcher->SignalJobError()"); tsk.launcher->SignalJobError(tsk.task_id); } } + NARF("Calling launcher->WaitForJobs"); int res = launcher->WaitForJobs(); + NARF("launcher->WaitForJobs returned %d", res); return res; } @@ -388,14 +397,20 @@ class ThreadPool { // Initialize the spin count (from envvar TVM_THREAD_POOL_SPIN_COUNT) on // the global first use of the ThreadPool. // TODO(tulloch): should we make this configurable via standard APIs? + NARF("In RunWorker(%d)", worker_id); static size_t spin_count = GetSpinCount(); + NARF("Trying to pop task off of worker queue %d", worker_id); while (queue->Pop(&task, spin_count)) { + NARF("Got a task for worker %d", worker_id); ICHECK(task.launcher != nullptr); TVMParallelGroupEnv* penv = &(task.launcher->env); void* cdata = task.launcher->cdata; + NARF("Executing flambda for worker %d", worker_id); if ((*task.launcher->flambda)(task.task_id, penv, cdata) == 0) { task.launcher->SignalJobFinish(); + NARF("Calling SignalJobFinish for worker %d", worker_id); } else { + NARF("Calling SignalJobError for worker %d", worker_id); task.launcher->SignalJobError(task.task_id); } } @@ -464,6 +479,7 @@ int TVMBackendParallelLaunch(FTVMParallelLambda flambda, void* cdata, int num_ta } else { #if !TVM_THREADPOOL_USE_OPENMP int res = tvm::runtime::ThreadPool::ThreadLocal()->Launch(flambda, cdata, num_task, 1); + NARF("tvm::runtime::ThreadPool::ThreadLocal()->Launch returned %d", res); return res; #else if (num_task == 0) num_task = num_workers; diff --git a/src/runtime/threading_backend.cc b/src/runtime/threading_backend.cc index e4925be1b74f..de1dd1db3328 100644 --- a/src/runtime/threading_backend.cc +++ b/src/runtime/threading_backend.cc @@ -47,17 +47,18 @@ namespace runtime { namespace threading { #if defined(__hexagon__) class QuRTThread { + typedef std::function Callback; public: //template //explicit QuRTThread(Function&& f) { - QuRTThread(std::function worker_callback, int worker_id) : - f(worker_callback), i(worker_id) { - qurt_thread_attr_t attr; - FARF(LOW, "Creating worker thread %d", i); + QuRTThread(Callback worker_callback) : + f(worker_callback) { + //FARF(LOW, "Creating worker thread %d", i); qurt_thread_attr_init(&attr); qurt_thread_attr_set_stack_size(&attr, sizeof(stack)); qurt_thread_attr_set_stack_addr(&attr, stack); qurt_thread_create(&thread, &attr, (void (*)(void *))run_func, this); + FARF(LOW, "created thread %d", thread); } bool joinable() const { FARF(LOW, "Checking if current thread %d != %d for joinability", qurt_thread_get_id(), thread); @@ -65,35 +66,33 @@ class QuRTThread { } void join() { int status; - FARF(LOW, "join() called on thread id %d (worker id %d)", thread, i); + FARF(LOW, "join() called on thread id %d", thread); qurt_thread_join(thread, &status); } private: static void run_func(QuRTThread * t) { - FARF(LOW, "In run_func"); - t->f(t->i); - FARF(LOW, "Leaving run_func"); + FARF(LOW, "In run_func for thread %d", qurt_thread_get_id()); + t->f(); + FARF(LOW, "Leaving run_func %d", qurt_thread_get_id()); qurt_thread_exit(QURT_EOK); - FARF(LOW, "I SHOULD NOT BE HERE"); } + qurt_thread_attr_t attr; qurt_thread_t thread; - std::function f; - int i; + Callback f; uint8_t stack[HEXAGON_STACK_SIZE]; }; #endif -thread_local int max_concurrency = 0; +thread_local int max_concurrency = 3; class ThreadGroup::Impl { public: Impl(int num_workers, std::function worker_callback, bool exclude_worker0) : num_workers_(num_workers) { +#ifdef __hexagon__ + FARF(LOW, "num_workers requested = %d, exclude_worker0 = %d", num_workers_, exclude_worker0); +#endif ICHECK_GE(num_workers, 1) << "Requested a non-positive number of worker threads."; for (int i = exclude_worker0; i < num_workers_; ++i) { -#ifdef __hexagon__ - threads_.emplace_back(QuRTThread(worker_callback, i)); -#else threads_.emplace_back([worker_callback, i] { worker_callback(i); }); -#endif // __hexagon__ } InitSortedOrder(); } From 91d2b23c880949996d37a21041ffefee79b88d92 Mon Sep 17 00:00:00 2001 From: Karl Koscher Date: Mon, 11 Apr 2022 17:58:24 -0700 Subject: [PATCH 19/28] Sleep for a little bit to let race condition bugs shine through --- src/runtime/threading_backend.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/runtime/threading_backend.cc b/src/runtime/threading_backend.cc index de1dd1db3328..444303635d7c 100644 --- a/src/runtime/threading_backend.cc +++ b/src/runtime/threading_backend.cc @@ -72,8 +72,9 @@ class QuRTThread { private: static void run_func(QuRTThread * t) { FARF(LOW, "In run_func for thread %d", qurt_thread_get_id()); + qurt_sleep(100000); t->f(); - FARF(LOW, "Leaving run_func %d", qurt_thread_get_id()); + FARF(LOW, "Leaving run_func for thread %d", qurt_thread_get_id()); qurt_thread_exit(QURT_EOK); } qurt_thread_attr_t attr; From cb2104a3f8ea22c551654e6fe2ba42a566018abb Mon Sep 17 00:00:00 2001 From: Karl Koscher Date: Wed, 13 Apr 2022 13:01:12 -0700 Subject: [PATCH 20/28] ayeee it works! --- src/runtime/thread_pool.cc | 34 +++++------------------- src/runtime/threading_backend.cc | 45 +++++++++++++++++++++++++------- 2 files changed, 42 insertions(+), 37 deletions(-) diff --git a/src/runtime/thread_pool.cc b/src/runtime/thread_pool.cc index 7424192ce252..2ab78cdd337c 100644 --- a/src/runtime/thread_pool.cc +++ b/src/runtime/thread_pool.cc @@ -47,33 +47,6 @@ #define FARF_LOW 1 #include #define NARF(...) FARF(LOW, __VA_ARGS__) -class Mutex { - public: - Mutex() { qurt_mutex_init(&mutex); } - ~Mutex() { qurt_mutex_destroy(&mutex); } - //operator=() = delete; - void lock() { qurt_mutex_lock(&mutex); } - bool try_lock() { return qurt_mutex_try_lock(&mutex) == 0; } - void unlock() { qurt_mutex_unlock(&mutex); } - qurt_mutex_t * mutex_ptr() { return &mutex; } - private: - qurt_mutex_t mutex; -}; -class ConditionVariable { - public: - ConditionVariable() { qurt_cond_init(&cond); } - ~ConditionVariable() { qurt_cond_destroy(&cond); } - void notify_all() { qurt_cond_broadcast(&cond); } - void notify_one() { qurt_cond_signal(&cond); } - template - void wait(std::unique_lock& lock, Predicate stop_waiting) { - while (!stop_waiting()) { - qurt_cond_wait(&cond, lock.mutex()->mutex_ptr()); - } - } - private: - qurt_cond_t cond; -}; #else typedef std::mutex Mutex; typedef std::condition_variable ConditionVariable; @@ -216,13 +189,17 @@ class SpscTaskQueue { // Busy wait a bit when the queue is empty. // If a new task comes to the queue quickly, this wait avoid the worker from sleeping. // The default spin count is set by following the typical omp convention + NARF("thread %d: Pop() ... ", qurt_thread_get_id()); for (uint32_t i = 0; i < spin_count && pending_.load() == 0; ++i) { tvm::runtime::threading::Yield(); } + NARF("thread %d: done spinning in Pop()", qurt_thread_get_id()); if (pending_.fetch_sub(1) == 0) { + NARF("thread %d: Nothing available yet in Pop(), waiting...", qurt_thread_get_id()); std::unique_lock lock(mutex_); cv_.wait(lock, [this] { return pending_.load() >= 0 || exit_now_.load(); }); } + NARF("thread %d: Pop() got something", qurt_thread_get_id()); if (exit_now_.load(std::memory_order_relaxed)) { return false; } @@ -231,6 +208,7 @@ class SpscTaskQueue { ICHECK(tail_.load(std::memory_order_acquire) != head); *output = buffer_[head]; head_.store((head + 1) % kRingSize, std::memory_order_release); + NARF("thread %d: leaving Pop()", qurt_thread_get_id()); return true; } @@ -397,7 +375,7 @@ class ThreadPool { // Initialize the spin count (from envvar TVM_THREAD_POOL_SPIN_COUNT) on // the global first use of the ThreadPool. // TODO(tulloch): should we make this configurable via standard APIs? - NARF("In RunWorker(%d)", worker_id); + NARF("In RunWorker(%d), PL:TL() = %08x", worker_id, ParallelLauncher::ThreadLocal()); static size_t spin_count = GetSpinCount(); NARF("Trying to pop task off of worker queue %d", worker_id); while (queue->Pop(&task, spin_count)) { diff --git a/src/runtime/threading_backend.cc b/src/runtime/threading_backend.cc index 444303635d7c..72b81ddac078 100644 --- a/src/runtime/threading_backend.cc +++ b/src/runtime/threading_backend.cc @@ -37,7 +37,9 @@ #define FARF_LOW 1 #include #include +#include #define HEXAGON_STACK_SIZE 65536 +#define HEXAGON_STACK_ALIGNMENT 32 #endif #include #include @@ -53,12 +55,30 @@ class QuRTThread { //explicit QuRTThread(Function&& f) { QuRTThread(Callback worker_callback) : f(worker_callback) { - //FARF(LOW, "Creating worker thread %d", i); + static int id = 1; + qurt_thread_attr_t attr; + char name[32]; + FARF(LOW, "Creating worker thread %d", id); + int ret = posix_memalign(&stack, HEXAGON_STACK_ALIGNMENT, HEXAGON_STACK_SIZE); + FARF(LOW, "posix_memalign returned %d, stack = %08x", ret, stack); qurt_thread_attr_init(&attr); - qurt_thread_attr_set_stack_size(&attr, sizeof(stack)); + qurt_thread_attr_set_stack_size(&attr, HEXAGON_STACK_SIZE); qurt_thread_attr_set_stack_addr(&attr, stack); + snprintf(name, sizeof(name), "worker %d", id++); + qurt_thread_attr_set_name(&attr, name); qurt_thread_create(&thread, &attr, (void (*)(void *))run_func, this); - FARF(LOW, "created thread %d", thread); + FARF(LOW, "created thread %d (stack = %08x)", thread, stack); + } + QuRTThread(QuRTThread&& other) : + thread(other.thread), f(other.f), stack(other.stack) { + other.thread = 0; + } + ~QuRTThread() { + FARF(LOW, "~QuRTThread()"); + if (thread) { + join(); + free(stack); + } } bool joinable() const { FARF(LOW, "Checking if current thread %d != %d for joinability", qurt_thread_get_id(), thread); @@ -66,24 +86,23 @@ class QuRTThread { } void join() { int status; - FARF(LOW, "join() called on thread id %d", thread); + FARF(LOW, "join() called on thread id %d from thread %d", thread, qurt_thread_get_id()); qurt_thread_join(thread, &status); } private: static void run_func(QuRTThread * t) { FARF(LOW, "In run_func for thread %d", qurt_thread_get_id()); - qurt_sleep(100000); t->f(); + qurt_sleep(100000); FARF(LOW, "Leaving run_func for thread %d", qurt_thread_get_id()); qurt_thread_exit(QURT_EOK); } - qurt_thread_attr_t attr; qurt_thread_t thread; Callback f; - uint8_t stack[HEXAGON_STACK_SIZE]; + void *stack; }; #endif -thread_local int max_concurrency = 3; +thread_local int max_concurrency = 0; class ThreadGroup::Impl { public: Impl(int num_workers, std::function worker_callback, bool exclude_worker0) @@ -97,7 +116,15 @@ class ThreadGroup::Impl { } InitSortedOrder(); } - ~Impl() { Join(); } + ~Impl() { +#ifdef __hexagon__ + FARF(LOW, "Calling Join() from ~Impl()"); +#endif + Join(); +#ifdef __hexagon__ + FARF(LOW, "Join() returned in ~Impl()"); +#endif + } void Join() { for (auto& t : threads_) { From 0d20028fb00c64079403bdde771f8fc89c09bede Mon Sep 17 00:00:00 2001 From: Karl Koscher Date: Wed, 13 Apr 2022 13:34:33 -0700 Subject: [PATCH 21/28] Remove custom hexagon implementations of std::mutex and std::condition_variable --- src/runtime/thread_pool.cc | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/runtime/thread_pool.cc b/src/runtime/thread_pool.cc index 2ab78cdd337c..1323c39e2b89 100644 --- a/src/runtime/thread_pool.cc +++ b/src/runtime/thread_pool.cc @@ -48,8 +48,6 @@ #include #define NARF(...) FARF(LOW, __VA_ARGS__) #else -typedef std::mutex Mutex; -typedef std::condition_variable ConditionVariable; #define NARF(...) #endif @@ -174,7 +172,7 @@ class SpscTaskQueue { tvm::runtime::threading::Yield(); } if (pending_.fetch_add(1) == -1) { - std::unique_lock lock(mutex_); + std::unique_lock lock(mutex_); cv_.notify_one(); } } @@ -196,7 +194,7 @@ class SpscTaskQueue { NARF("thread %d: done spinning in Pop()", qurt_thread_get_id()); if (pending_.fetch_sub(1) == 0) { NARF("thread %d: Nothing available yet in Pop(), waiting...", qurt_thread_get_id()); - std::unique_lock lock(mutex_); + std::unique_lock lock(mutex_); cv_.wait(lock, [this] { return pending_.load() >= 0 || exit_now_.load(); }); } NARF("thread %d: Pop() got something", qurt_thread_get_id()); @@ -216,7 +214,7 @@ class SpscTaskQueue { * \brief Signal to terminate the worker. */ void SignalForKill() { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); exit_now_.store(true); cv_.notify_all(); } @@ -266,9 +264,9 @@ class SpscTaskQueue { std::atomic exit_now_{false}; // internal mutex - Mutex mutex_; + std::mutex mutex_; // cv for consumer - ConditionVariable cv_; + std::condition_variable cv_; }; // The thread pool From a0bf101d0fe506d3fc47d1e72becf301a8a19e86 Mon Sep 17 00:00:00 2001 From: Karl Koscher Date: Wed, 13 Apr 2022 13:54:39 -0700 Subject: [PATCH 22/28] threading_backend.cc code cleanup --- src/runtime/threading_backend.cc | 43 ++++++-------------------------- 1 file changed, 8 insertions(+), 35 deletions(-) diff --git a/src/runtime/threading_backend.cc b/src/runtime/threading_backend.cc index 72b81ddac078..b77a5a25a977 100644 --- a/src/runtime/threading_backend.cc +++ b/src/runtime/threading_backend.cc @@ -34,8 +34,6 @@ #endif #if defined(__hexagon__) #include -#define FARF_LOW 1 -#include #include #include #define HEXAGON_STACK_SIZE 65536 @@ -47,83 +45,61 @@ namespace tvm { namespace runtime { namespace threading { -#if defined(__hexagon__) +#ifdef __hexagon__ class QuRTThread { typedef std::function Callback; public: - //template - //explicit QuRTThread(Function&& f) { QuRTThread(Callback worker_callback) : f(worker_callback) { static int id = 1; qurt_thread_attr_t attr; char name[32]; - FARF(LOW, "Creating worker thread %d", id); - int ret = posix_memalign(&stack, HEXAGON_STACK_ALIGNMENT, HEXAGON_STACK_SIZE); - FARF(LOW, "posix_memalign returned %d, stack = %08x", ret, stack); + posix_memalign(&stack, HEXAGON_STACK_ALIGNMENT, HEXAGON_STACK_SIZE); qurt_thread_attr_init(&attr); qurt_thread_attr_set_stack_size(&attr, HEXAGON_STACK_SIZE); qurt_thread_attr_set_stack_addr(&attr, stack); snprintf(name, sizeof(name), "worker %d", id++); qurt_thread_attr_set_name(&attr, name); qurt_thread_create(&thread, &attr, (void (*)(void *))run_func, this); - FARF(LOW, "created thread %d (stack = %08x)", thread, stack); } QuRTThread(QuRTThread&& other) : thread(other.thread), f(other.f), stack(other.stack) { other.thread = 0; } ~QuRTThread() { - FARF(LOW, "~QuRTThread()"); if (thread) { join(); free(stack); } } - bool joinable() const { - FARF(LOW, "Checking if current thread %d != %d for joinability", qurt_thread_get_id(), thread); - return qurt_thread_get_id() != thread; - } + bool joinable() const { return qurt_thread_get_id() != thread; } void join() { int status; - FARF(LOW, "join() called on thread id %d from thread %d", thread, qurt_thread_get_id()); qurt_thread_join(thread, &status); } private: static void run_func(QuRTThread * t) { - FARF(LOW, "In run_func for thread %d", qurt_thread_get_id()); t->f(); - qurt_sleep(100000); - FARF(LOW, "Leaving run_func for thread %d", qurt_thread_get_id()); qurt_thread_exit(QURT_EOK); } qurt_thread_t thread; Callback f; void *stack; }; -#endif +#endif // __hexagon__ thread_local int max_concurrency = 0; class ThreadGroup::Impl { public: Impl(int num_workers, std::function worker_callback, bool exclude_worker0) : num_workers_(num_workers) { -#ifdef __hexagon__ - FARF(LOW, "num_workers requested = %d, exclude_worker0 = %d", num_workers_, exclude_worker0); -#endif ICHECK_GE(num_workers, 1) << "Requested a non-positive number of worker threads."; for (int i = exclude_worker0; i < num_workers_; ++i) { threads_.emplace_back([worker_callback, i] { worker_callback(i); }); } InitSortedOrder(); } - ~Impl() { -#ifdef __hexagon__ - FARF(LOW, "Calling Join() from ~Impl()"); -#endif + ~Impl() { Join(); -#ifdef __hexagon__ - FARF(LOW, "Join() returned in ~Impl()"); -#endif } void Join() { @@ -167,9 +143,6 @@ class ThreadGroup::Impl { private: void SetThreadAffinity(std::thread::native_handle_type thread, const std::vector& ids) { -#ifdef __hexagon__ - FARF(LOW, "SetThreadAffinity called!!"); -#endif #if defined(__linux__) || defined(__ANDROID__) if (pthread_equal(thread, CURRENT_THREAD_HANDLE)) { thread = pthread_self(); @@ -248,7 +221,7 @@ class ThreadGroup::Impl { SetMasterThreadFullCpuAffinity(mode); } } - #endif // __hexagon__ +#endif // __hexagon__ } void SetThreadFullCpuAffinity(std::thread::native_handle_type thread, AffinityMode mode) { @@ -262,7 +235,7 @@ class ThreadGroup::Impl { // Note: this works well on x86 too. Because x86 doesn't have BIG.LITTLE, // our implementation will use kBig mode by default and will let main thread // run on intended cores. - #ifndef __hexagon__ +#ifndef __hexagon__ std::vector ids; switch (mode) { case kSpecifyOneCorePerThread: @@ -284,7 +257,7 @@ class ThreadGroup::Impl { break; } SetThreadAffinity(thread, ids); - #endif // __hexagon__ +#endif // __hexagon__ } void SetMasterThreadFullCpuAffinity(AffinityMode mode) { From 95e99d91d5868b04760755497e29e8b5ec55b0fc Mon Sep 17 00:00:00 2001 From: Karl Koscher Date: Wed, 13 Apr 2022 14:35:23 -0700 Subject: [PATCH 23/28] Formatting changes --- src/runtime/threading_backend.cc | 84 ++++++++++++++++---------------- 1 file changed, 42 insertions(+), 42 deletions(-) diff --git a/src/runtime/threading_backend.cc b/src/runtime/threading_backend.cc index b77a5a25a977..819ac82051e5 100644 --- a/src/runtime/threading_backend.cc +++ b/src/runtime/threading_backend.cc @@ -46,47 +46,49 @@ namespace tvm { namespace runtime { namespace threading { #ifdef __hexagon__ +// pthreads are broken on older versions of qurt, so +// we need to use native APIs instead of std::threads class QuRTThread { typedef std::function Callback; - public: - QuRTThread(Callback worker_callback) : - f(worker_callback) { - static int id = 1; - qurt_thread_attr_t attr; - char name[32]; - posix_memalign(&stack, HEXAGON_STACK_ALIGNMENT, HEXAGON_STACK_SIZE); - qurt_thread_attr_init(&attr); - qurt_thread_attr_set_stack_size(&attr, HEXAGON_STACK_SIZE); - qurt_thread_attr_set_stack_addr(&attr, stack); - snprintf(name, sizeof(name), "worker %d", id++); - qurt_thread_attr_set_name(&attr, name); - qurt_thread_create(&thread, &attr, (void (*)(void *))run_func, this); - } - QuRTThread(QuRTThread&& other) : - thread(other.thread), f(other.f), stack(other.stack) { - other.thread = 0; - } - ~QuRTThread() { - if (thread) { - join(); - free(stack); - } - } - bool joinable() const { return qurt_thread_get_id() != thread; } - void join() { - int status; - qurt_thread_join(thread, &status); - } - private: - static void run_func(QuRTThread * t) { - t->f(); - qurt_thread_exit(QURT_EOK); + + public: + explicit QuRTThread(Callback worker_callback) : f(worker_callback) { + static int id = 1; + qurt_thread_attr_t attr; + char name[32]; + posix_memalign(&stack, HEXAGON_STACK_ALIGNMENT, HEXAGON_STACK_SIZE); + qurt_thread_attr_init(&attr); + qurt_thread_attr_set_stack_size(&attr, HEXAGON_STACK_SIZE); + qurt_thread_attr_set_stack_addr(&attr, stack); + snprintf(name, sizeof(name), "worker %d", id++); + qurt_thread_attr_set_name(&attr, name); + qurt_thread_create(&thread, &attr, (void (*)(void*))run_func, this); + } + QuRTThread(QuRTThread&& other) : thread(other.thread), f(other.f), stack(other.stack) { + other.thread = 0; + } + ~QuRTThread() { + if (thread) { + join(); + free(stack); } - qurt_thread_t thread; - Callback f; - void *stack; + } + bool joinable() const { return qurt_thread_get_id() != thread; } + void join() { + int status; + qurt_thread_join(thread, &status); + } + + private: + static void run_func(QuRTThread* t) { + t->f(); + qurt_thread_exit(QURT_EOK); + } + qurt_thread_t thread; + Callback f; + void* stack; }; -#endif // __hexagon__ +#endif // __hexagon__ thread_local int max_concurrency = 0; class ThreadGroup::Impl { public: @@ -98,9 +100,7 @@ class ThreadGroup::Impl { } InitSortedOrder(); } - ~Impl() { - Join(); - } + ~Impl() { Join(); } void Join() { for (auto& t : threads_) { @@ -221,7 +221,7 @@ class ThreadGroup::Impl { SetMasterThreadFullCpuAffinity(mode); } } -#endif // __hexagon__ +#endif // __hexagon__ } void SetThreadFullCpuAffinity(std::thread::native_handle_type thread, AffinityMode mode) { @@ -257,7 +257,7 @@ class ThreadGroup::Impl { break; } SetThreadAffinity(thread, ids); -#endif // __hexagon__ +#endif // __hexagon__ } void SetMasterThreadFullCpuAffinity(AffinityMode mode) { From 912ecc6050db1b49356c1a9ad54676fcfeeee998 Mon Sep 17 00:00:00 2001 From: Karl Koscher Date: Wed, 13 Apr 2022 14:39:37 -0700 Subject: [PATCH 24/28] remove hexagon debugging --- src/runtime/thread_pool.cc | 27 --------------------------- 1 file changed, 27 deletions(-) diff --git a/src/runtime/thread_pool.cc b/src/runtime/thread_pool.cc index 1323c39e2b89..ef1369c7496f 100644 --- a/src/runtime/thread_pool.cc +++ b/src/runtime/thread_pool.cc @@ -43,14 +43,6 @@ #include #include -#ifdef __hexagon__ -#define FARF_LOW 1 -#include -#define NARF(...) FARF(LOW, __VA_ARGS__) -#else -#define NARF(...) -#endif - #include "../support/utils.h" const constexpr int kL1CacheBytes = 64; @@ -187,17 +179,13 @@ class SpscTaskQueue { // Busy wait a bit when the queue is empty. // If a new task comes to the queue quickly, this wait avoid the worker from sleeping. // The default spin count is set by following the typical omp convention - NARF("thread %d: Pop() ... ", qurt_thread_get_id()); for (uint32_t i = 0; i < spin_count && pending_.load() == 0; ++i) { tvm::runtime::threading::Yield(); } - NARF("thread %d: done spinning in Pop()", qurt_thread_get_id()); if (pending_.fetch_sub(1) == 0) { - NARF("thread %d: Nothing available yet in Pop(), waiting...", qurt_thread_get_id()); std::unique_lock lock(mutex_); cv_.wait(lock, [this] { return pending_.load() >= 0 || exit_now_.load(); }); } - NARF("thread %d: Pop() got something", qurt_thread_get_id()); if (exit_now_.load(std::memory_order_relaxed)) { return false; } @@ -206,7 +194,6 @@ class SpscTaskQueue { ICHECK(tail_.load(std::memory_order_acquire) != head); *output = buffer_[head]; head_.store((head + 1) % kRingSize, std::memory_order_release); - NARF("thread %d: leaving Pop()", qurt_thread_get_id()); return true; } @@ -310,30 +297,23 @@ class ThreadPool { << " workers=" << num_workers_used_ << " request=" << num_task; } launcher->Init(flambda, cdata, num_task, need_sync != 0); - NARF("launcher->Init returned"); SpscTaskQueue::Task tsk; tsk.launcher = launcher; // if worker0 is taken by the main, queues_[0] is abandoned for (int i = exclude_worker0_; i < num_task; ++i) { - NARF("queues_[%d]->Push(tsk)", i); tsk.task_id = i; queues_[i]->Push(tsk); } // use the main thread to run task 0 if (exclude_worker0_) { TVMParallelGroupEnv* penv = &(tsk.launcher->env); - NARF("Calling *tsk.launcher->flambda(0)"); if ((*tsk.launcher->flambda)(0, penv, cdata) == 0) { - NARF("Calling tsk.launcher->SignalJobFinish()"); tsk.launcher->SignalJobFinish(); } else { - NARF("Calling tsk.launcher->SignalJobError()"); tsk.launcher->SignalJobError(tsk.task_id); } } - NARF("Calling launcher->WaitForJobs"); int res = launcher->WaitForJobs(); - NARF("launcher->WaitForJobs returned %d", res); return res; } @@ -373,20 +353,14 @@ class ThreadPool { // Initialize the spin count (from envvar TVM_THREAD_POOL_SPIN_COUNT) on // the global first use of the ThreadPool. // TODO(tulloch): should we make this configurable via standard APIs? - NARF("In RunWorker(%d), PL:TL() = %08x", worker_id, ParallelLauncher::ThreadLocal()); static size_t spin_count = GetSpinCount(); - NARF("Trying to pop task off of worker queue %d", worker_id); while (queue->Pop(&task, spin_count)) { - NARF("Got a task for worker %d", worker_id); ICHECK(task.launcher != nullptr); TVMParallelGroupEnv* penv = &(task.launcher->env); void* cdata = task.launcher->cdata; - NARF("Executing flambda for worker %d", worker_id); if ((*task.launcher->flambda)(task.task_id, penv, cdata) == 0) { task.launcher->SignalJobFinish(); - NARF("Calling SignalJobFinish for worker %d", worker_id); } else { - NARF("Calling SignalJobError for worker %d", worker_id); task.launcher->SignalJobError(task.task_id); } } @@ -455,7 +429,6 @@ int TVMBackendParallelLaunch(FTVMParallelLambda flambda, void* cdata, int num_ta } else { #if !TVM_THREADPOOL_USE_OPENMP int res = tvm::runtime::ThreadPool::ThreadLocal()->Launch(flambda, cdata, num_task, 1); - NARF("tvm::runtime::ThreadPool::ThreadLocal()->Launch returned %d", res); return res; #else if (num_task == 0) num_task = num_workers; From e4d2b14916f575360079d3fa7d55fde2ea3c2ac7 Mon Sep 17 00:00:00 2001 From: Karl Koscher Date: Mon, 18 Apr 2022 15:57:54 -0700 Subject: [PATCH 25/28] Add hexagon thread pool test --- .../contrib/test_hexagon/test_thread_pool.py | 90 +++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 tests/python/contrib/test_hexagon/test_thread_pool.py diff --git a/tests/python/contrib/test_hexagon/test_thread_pool.py b/tests/python/contrib/test_hexagon/test_thread_pool.py new file mode 100644 index 000000000000..8ca11e6e2973 --- /dev/null +++ b/tests/python/contrib/test_hexagon/test_thread_pool.py @@ -0,0 +1,90 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import numpy as np +import pytest + +import tvm +import tvm.contrib.hexagon +import tvm.script +import tvm.testing +from tvm import te + +from .conftest import requires_hexagon_toolchain +from tvm.script import tir as T + +@tvm.script.ir_module +class ElemwiseSumIRModule: + @T.prim_func + def elemwise_sum_serial(a: T.handle, b: T.handle, c: T.handle, n: T.int32): + T.func_attr({"global_symbol": "elemwise_sum_serial", "tir.noalias": True}) + A = T.match_buffer(a, (n,), dtype="float32") + B = T.match_buffer(b, (n,), dtype="float32") + C = T.match_buffer(c, (n,), dtype="float32") + for i in T.serial(n): + with T.block("C"): + vi = T.axis.spatial(n, i) + C[vi] = A[vi] + B[vi] + @T.prim_func + def elemwise_sum_parallel(a: T.handle, b: T.handle, c: T.handle, n: T.int32): + T.func_attr({"global_symbol": "elemwise_sum_parallel", "tir.noalias": True}) + A = T.match_buffer(a, (n,), dtype="float32") + B = T.match_buffer(b, (n,), dtype="float32") + C = T.match_buffer(c, (n,), dtype="float32") + for i in T.parallel(n): + with T.block("C"): + vi = T.axis.spatial(n, i) + C[vi] = A[vi] + B[vi] + +def generate_add_test_data(hexagon_session, n=128*1024): + a = tvm.nd.array(np.random.uniform(size=n).astype("float32"), hexagon_session.device) + b = tvm.nd.array(np.random.uniform(size=n).astype("float32"), hexagon_session.device) + c = tvm.nd.array(np.zeros(n, dtype="float32"), hexagon_session.device) + return (a, b, c, n) + +def benchmark_func(mod, name, args, hexagon_session): + (a, b, c, n) = args + evaluator = mod.time_evaluator(name, hexagon_session.device, number=100) + return evaluator(a, b, c, n).mean + +@requires_hexagon_toolchain +def test_speedup(hexagon_session, capsys): + if hexagon_session is None: + pytest.skip(msg="Skip hardware test, ANDROID_SERIAL_NUMBER is not set.") + + target_hexagon = tvm.target.hexagon("v68", link_params=True) + func = tvm.build(ElemwiseSumIRModule, target=tvm.target.Target(target_hexagon, host=target_hexagon)) + mod = hexagon_session.load_module(func) + args = generate_add_test_data(hexagon_session) + parallel_mean = benchmark_func(mod, 'elemwise_sum_parallel', args, hexagon_session) + serial_mean = benchmark_func(mod, 'elemwise_sum_serial', args, hexagon_session) + + with capsys.disabled(): + print("... speedup of {:.2f}".format(serial_mean / parallel_mean), end=' ') + +@requires_hexagon_toolchain +def test_elemwise_sum_parallel(hexagon_session): + if hexagon_session is None: + pytest.skip(msg="Skip hardware test, ANDROID_SERIAL_NUMBER is not set.") + + target_hexagon = tvm.target.hexagon("v68", link_params=True) + func = tvm.build(ElemwiseSumIRModule, target=tvm.target.Target(target_hexagon, host=target_hexagon)) + mod = hexagon_session.load_module(func) + + (a, b, c, n) = generate_add_test_data(hexagon_session) + mod['elemwise_sum_parallel'](a, b, c, n) + tvm.testing.assert_allclose(c.numpy(), a.numpy() + b.numpy()) From 590c1b3fffea9879ae610a9f8ffcb5cba800d126 Mon Sep 17 00:00:00 2001 From: Karl Koscher Date: Mon, 18 Apr 2022 17:02:29 -0700 Subject: [PATCH 26/28] style fixes for tests/python/contrib/test_hexagon/test_thread_pool.py --- .../contrib/test_hexagon/test_thread_pool.py | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/tests/python/contrib/test_hexagon/test_thread_pool.py b/tests/python/contrib/test_hexagon/test_thread_pool.py index 8ca11e6e2973..a05404914607 100644 --- a/tests/python/contrib/test_hexagon/test_thread_pool.py +++ b/tests/python/contrib/test_hexagon/test_thread_pool.py @@ -27,6 +27,7 @@ from .conftest import requires_hexagon_toolchain from tvm.script import tir as T + @tvm.script.ir_module class ElemwiseSumIRModule: @T.prim_func @@ -39,6 +40,7 @@ def elemwise_sum_serial(a: T.handle, b: T.handle, c: T.handle, n: T.int32): with T.block("C"): vi = T.axis.spatial(n, i) C[vi] = A[vi] + B[vi] + @T.prim_func def elemwise_sum_parallel(a: T.handle, b: T.handle, c: T.handle, n: T.int32): T.func_attr({"global_symbol": "elemwise_sum_parallel", "tir.noalias": True}) @@ -50,31 +52,37 @@ def elemwise_sum_parallel(a: T.handle, b: T.handle, c: T.handle, n: T.int32): vi = T.axis.spatial(n, i) C[vi] = A[vi] + B[vi] -def generate_add_test_data(hexagon_session, n=128*1024): + +def generate_add_test_data(hexagon_session, n=128 * 1024): a = tvm.nd.array(np.random.uniform(size=n).astype("float32"), hexagon_session.device) b = tvm.nd.array(np.random.uniform(size=n).astype("float32"), hexagon_session.device) c = tvm.nd.array(np.zeros(n, dtype="float32"), hexagon_session.device) return (a, b, c, n) + def benchmark_func(mod, name, args, hexagon_session): (a, b, c, n) = args evaluator = mod.time_evaluator(name, hexagon_session.device, number=100) return evaluator(a, b, c, n).mean + @requires_hexagon_toolchain def test_speedup(hexagon_session, capsys): if hexagon_session is None: pytest.skip(msg="Skip hardware test, ANDROID_SERIAL_NUMBER is not set.") target_hexagon = tvm.target.hexagon("v68", link_params=True) - func = tvm.build(ElemwiseSumIRModule, target=tvm.target.Target(target_hexagon, host=target_hexagon)) + func = tvm.build( + ElemwiseSumIRModule, target=tvm.target.Target(target_hexagon, host=target_hexagon) + ) mod = hexagon_session.load_module(func) args = generate_add_test_data(hexagon_session) - parallel_mean = benchmark_func(mod, 'elemwise_sum_parallel', args, hexagon_session) - serial_mean = benchmark_func(mod, 'elemwise_sum_serial', args, hexagon_session) + parallel_mean = benchmark_func(mod, "elemwise_sum_parallel", args, hexagon_session) + serial_mean = benchmark_func(mod, "elemwise_sum_serial", args, hexagon_session) with capsys.disabled(): - print("... speedup of {:.2f}".format(serial_mean / parallel_mean), end=' ') + print("... speedup of {:.2f}".format(serial_mean / parallel_mean), end=" ") + @requires_hexagon_toolchain def test_elemwise_sum_parallel(hexagon_session): @@ -82,9 +90,11 @@ def test_elemwise_sum_parallel(hexagon_session): pytest.skip(msg="Skip hardware test, ANDROID_SERIAL_NUMBER is not set.") target_hexagon = tvm.target.hexagon("v68", link_params=True) - func = tvm.build(ElemwiseSumIRModule, target=tvm.target.Target(target_hexagon, host=target_hexagon)) + func = tvm.build( + ElemwiseSumIRModule, target=tvm.target.Target(target_hexagon, host=target_hexagon) + ) mod = hexagon_session.load_module(func) (a, b, c, n) = generate_add_test_data(hexagon_session) - mod['elemwise_sum_parallel'](a, b, c, n) + mod["elemwise_sum_parallel"](a, b, c, n) tvm.testing.assert_allclose(c.numpy(), a.numpy() + b.numpy()) From 5576fd32a055d6b72830f5b1d1ff2aadca3daed2 Mon Sep 17 00:00:00 2001 From: Karl Koscher Date: Tue, 19 Apr 2022 13:04:08 -0700 Subject: [PATCH 27/28] Fix some style issues --- src/runtime/threading_backend.cc | 35 ++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/src/runtime/threading_backend.cc b/src/runtime/threading_backend.cc index 819ac82051e5..8dd9c9d0fba4 100644 --- a/src/runtime/threading_backend.cc +++ b/src/runtime/threading_backend.cc @@ -52,41 +52,43 @@ class QuRTThread { typedef std::function Callback; public: - explicit QuRTThread(Callback worker_callback) : f(worker_callback) { + explicit QuRTThread(Callback worker_callback) : f_(worker_callback) { static int id = 1; qurt_thread_attr_t attr; char name[32]; - posix_memalign(&stack, HEXAGON_STACK_ALIGNMENT, HEXAGON_STACK_SIZE); + int ret = posix_memalign(&stack_, HEXAGON_STACK_ALIGNMENT, HEXAGON_STACK_SIZE); + CHECK_EQ(ret, 0); qurt_thread_attr_init(&attr); qurt_thread_attr_set_stack_size(&attr, HEXAGON_STACK_SIZE); - qurt_thread_attr_set_stack_addr(&attr, stack); + qurt_thread_attr_set_stack_addr(&attr, stack_); snprintf(name, sizeof(name), "worker %d", id++); qurt_thread_attr_set_name(&attr, name); - qurt_thread_create(&thread, &attr, (void (*)(void*))run_func, this); + ret = qurt_thread_create(&thread_, &attr, (void (*)(void*))RunFunction, this); + CHECK_EQ(ret, QURT_EOK); } - QuRTThread(QuRTThread&& other) : thread(other.thread), f(other.f), stack(other.stack) { - other.thread = 0; + QuRTThread(QuRTThread&& other) : thread_(other.thread_), f_(other.f_), stack_(other.stack_) { + other.thread_ = 0; } ~QuRTThread() { - if (thread) { + if (thread_) { join(); - free(stack); + free(stack_); } } - bool joinable() const { return qurt_thread_get_id() != thread; } + bool joinable() const { return qurt_thread_get_id() != thread_; } void join() { int status; - qurt_thread_join(thread, &status); + qurt_thread_join(thread_, &status); } private: - static void run_func(QuRTThread* t) { - t->f(); + static void RunFunction(QuRTThread* t) { + t->f_(); qurt_thread_exit(QURT_EOK); } - qurt_thread_t thread; - Callback f; - void* stack; + qurt_thread_t thread_; + Callback f_; + void* stack_; }; #endif // __hexagon__ thread_local int max_concurrency = 0; @@ -334,6 +336,9 @@ int ThreadGroup::Configure(AffinityMode mode, int nthreads, bool exclude_worker0 void Yield() { #ifdef __hexagon__ + // QuRT doesn't have a yield API, so instead we sleep for the minimum amount + // of time to let the OS schedule another thread. std::this_thread::yield() + // compiles down to an empty function. qurt_sleep(1); #else std::this_thread::yield(); From 53ef297541eb642f6dec78b5eb6dfccb089a5373 Mon Sep 17 00:00:00 2001 From: Karl Koscher Date: Thu, 21 Apr 2022 16:49:20 -0700 Subject: [PATCH 28/28] Address some reviewer comments --- src/runtime/threading_backend.cc | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/runtime/threading_backend.cc b/src/runtime/threading_backend.cc index 8dd9c9d0fba4..b067be4752a3 100644 --- a/src/runtime/threading_backend.cc +++ b/src/runtime/threading_backend.cc @@ -52,12 +52,15 @@ class QuRTThread { typedef std::function Callback; public: - explicit QuRTThread(Callback worker_callback) : f_(worker_callback) { + explicit QuRTThread(Callback worker_callback) : worker_callback_(worker_callback) { static int id = 1; qurt_thread_attr_t attr; char name[32]; int ret = posix_memalign(&stack_, HEXAGON_STACK_ALIGNMENT, HEXAGON_STACK_SIZE); CHECK_EQ(ret, 0); + // When a std::function<> is cast to bool, + // it indicates whether it stores a callable target + CHECK_EQ((bool)worker_callback_, true); qurt_thread_attr_init(&attr); qurt_thread_attr_set_stack_size(&attr, HEXAGON_STACK_SIZE); qurt_thread_attr_set_stack_addr(&attr, stack_); @@ -66,12 +69,18 @@ class QuRTThread { ret = qurt_thread_create(&thread_, &attr, (void (*)(void*))RunFunction, this); CHECK_EQ(ret, QURT_EOK); } - QuRTThread(QuRTThread&& other) : thread_(other.thread_), f_(other.f_), stack_(other.stack_) { + QuRTThread(QuRTThread&& other) + : thread_(other.thread_), + worker_callback_(std::move(other.worker_callback_)), + stack_(other.stack_) { other.thread_ = 0; + other.stack_ = nullptr; } ~QuRTThread() { if (thread_) { join(); + } + if (stack_) { free(stack_); } } @@ -82,13 +91,13 @@ class QuRTThread { } private: - static void RunFunction(QuRTThread* t) { - t->f_(); + static void RunFunction(QuRTThread* qrt_thread) { + qrt_thread->worker_callback_(); qurt_thread_exit(QURT_EOK); } qurt_thread_t thread_; - Callback f_; - void* stack_; + Callback worker_callback_; + void* stack_ = nullptr; }; #endif // __hexagon__ thread_local int max_concurrency = 0;