From bcaf64e816cecd88b16e1090a1b1177034d9bae9 Mon Sep 17 00:00:00 2001 From: Takafumi Arakaki Date: Tue, 7 Dec 2021 21:38:05 -0500 Subject: [PATCH 01/13] Implement Work-Stealing scheduler --- src/gc.c | 2 +- src/partr.c | 242 +++++++++++++++++++++++++++++++++++++++++++----- test/threads.jl | 5 +- 3 files changed, 226 insertions(+), 23 deletions(-) diff --git a/src/gc.c b/src/gc.c index 577ac5839eb87..b13a4138b6262 100644 --- a/src/gc.c +++ b/src/gc.c @@ -2787,7 +2787,7 @@ static void jl_gc_queue_thread_local(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp gc_mark_queue_obj(gc_cache, sp, ptls2->previous_exception); } -void jl_gc_mark_enqueued_tasks(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp); +extern void (*jl_gc_mark_enqueued_tasks)(jl_gc_mark_cache_t *, jl_gc_mark_sp_t *); extern jl_value_t *cmpswap_names JL_GLOBALLY_ROOTED; // mark the initial root set diff --git a/src/partr.c b/src/partr.c index 048a841158153..3abbe5edf588a 100644 --- a/src/partr.c +++ b/src/partr.c @@ -57,6 +57,12 @@ JL_DLLEXPORT int jl_set_task_tid(jl_task_t *task, int tid) JL_NOTSAFEPOINT extern int jl_gc_mark_queue_obj_explicit(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp, jl_value_t *obj) JL_NOTSAFEPOINT; +// partr dynamic dispatch +void (*jl_gc_mark_enqueued_tasks)(jl_gc_mark_cache_t *, jl_gc_mark_sp_t *); +static int (*partr_enqueue_task)(jl_task_t *, int16_t); +static jl_task_t *(*partr_dequeue_task)(void); +static int (*partr_check_empty)(void); + // multiq // --- @@ -83,20 +89,6 @@ static int32_t heap_p; static uint64_t cong_unbias; -static inline void multiq_init(void) -{ - heap_p = heap_c * jl_n_threads; - heaps = (taskheap_t *)calloc(heap_p, sizeof(taskheap_t)); - for (int32_t i = 0; i < heap_p; ++i) { - uv_mutex_init(&heaps[i].lock); - heaps[i].tasks = (jl_task_t **)calloc(tasks_per_heap, sizeof(jl_task_t*)); - jl_atomic_store_relaxed(&heaps[i].ntasks, 0); - jl_atomic_store_relaxed(&heaps[i].prio, INT16_MAX); - } - unbias_cong(heap_p, &cong_unbias); -} - - static inline void sift_up(taskheap_t *heap, int32_t idx) { if (idx > 0) { @@ -208,7 +200,7 @@ static inline jl_task_t *multiq_deletemin(void) } -void jl_gc_mark_enqueued_tasks(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp) +void multiq_gc_mark_enqueued_tasks(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp) { int32_t i, j; for (i = 0; i < heap_p; ++i) @@ -228,6 +220,210 @@ static int multiq_check_empty(void) } +static inline void multiq_init(void) +{ + heap_p = heap_c * jl_n_threads; + heaps = (taskheap_t *)calloc(heap_p, sizeof(taskheap_t)); + for (int32_t i = 0; i < heap_p; ++i) { + uv_mutex_init(&heaps[i].lock); + heaps[i].tasks = (jl_task_t **)calloc(tasks_per_heap, sizeof(jl_task_t*)); + jl_atomic_store_relaxed(&heaps[i].ntasks, 0); + jl_atomic_store_relaxed(&heaps[i].prio, INT16_MAX); + } + unbias_cong(heap_p, &cong_unbias); + jl_gc_mark_enqueued_tasks = &multiq_gc_mark_enqueued_tasks; + partr_enqueue_task = &multiq_insert; + partr_dequeue_task = &multiq_deletemin; + partr_check_empty = &multiq_check_empty; +} + + + +// work-stealing deque + +// The work-stealing deque by Chase and Lev (2005). Le et al. (2013) provides +// C11-complienet memory ordering. +// +// Ref: +// * Chase and Lev (2005) https://doi.org/10.1145/1073970.1073974 +// * Le et al. (2013) https://doi.org/10.1145/2442516.2442524 +// +// TODO: Dynamic buffer resizing. +typedef struct _wsdeque_t { + jl_task_t **tasks; + _Atomic(int64_t) top; + _Atomic(int64_t) bottom; + // TODO: pad +} wsdeque_t; + +static wsdeque_t *wsdeques; + + +static int wsdeque_push(jl_task_t *task, int16_t priority_ignord) +{ + int16_t tid = jl_threadid(); + int64_t b = jl_atomic_load_relaxed(&wsdeques[tid].bottom); + int64_t t = jl_atomic_load_acquire(&wsdeques[tid].top); + int64_t size = b - t; + if (size >= tasks_per_heap - 1) // full + return -1; + jl_atomic_store_relaxed((_Atomic(jl_task_t **))&wsdeques[tid].tasks[b % tasks_per_heap], + task); + jl_fence_release(); + jl_atomic_store_relaxed(&wsdeques[tid].bottom, b + 1); + return 0; +} + + +static jl_task_t *wsdeque_pop(void) +{ + int16_t tid = jl_threadid(); + int64_t b = jl_atomic_load_relaxed(&wsdeques[tid].bottom) - 1; + jl_atomic_store_relaxed(&wsdeques[tid].bottom, b); + jl_fence(); + int64_t t = jl_atomic_load_relaxed(&wsdeques[tid].top); + int64_t size = b - t; + if (size < 0) { + jl_atomic_store_relaxed(&wsdeques[tid].bottom, t); + return NULL; + } + jl_task_t *task = jl_atomic_load_relaxed( + (_Atomic(jl_task_t **))&wsdeques[tid].tasks[b % tasks_per_heap]); + if (size > 0) + return task; + if (!atomic_compare_exchange_strong_explicit(&wsdeques[tid].top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed)) + task = NULL; + jl_atomic_store_relaxed(&wsdeques[tid].bottom, b + 1); + return task; +} + + +static jl_task_t *wsdeque_steal(int16_t tid) +{ + int64_t t = jl_atomic_load_acquire(&wsdeques[tid].top); + jl_fence(); + int64_t b = jl_atomic_load_acquire(&wsdeques[tid].bottom); + int64_t size = b - t; + if (size <= 0) + return NULL; + jl_task_t *task = jl_atomic_load_relaxed( + (_Atomic(jl_task_t **))&wsdeques[tid].tasks[t % tasks_per_heap]); + if (!atomic_compare_exchange_strong_explicit( + &wsdeques[tid].top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed)) + return NULL; + return task; +} + + +static const int wsdeque_pop_stash = 16; + + +static jl_task_t *wsdeque_pop_or_steal(void) +{ + jl_ptls_t ptls = jl_current_task->ptls; + jl_task_t *task = NULL; + + // Try pop and lock the top `wsdeque_pop_stash` tasks in the local deque. + jl_task_t *stash[wsdeque_pop_stash]; + int n_stashed = 0; + for (; n_stashed < wsdeque_pop_stash; n_stashed++) { + task = wsdeque_pop(); + if (task != NULL) + if (!jl_set_task_tid(task, ptls->tid)) { + stash[n_stashed] = task; + task = NULL; + continue; + } + break; + } + // Put back stashed tasks in the original order; TODO: batch insert? + for (int i = n_stashed - 1; i >= 0; i--) { + int err = wsdeque_push(stash[i], 0); + (void)err; + assert(!err); + } + int pushed = n_stashed; + if (task) + goto done; + if (jl_n_threads < 2) + goto done; + + // Compute the lower bound of the number of empty slots. It's OK to be + // smaller than the actual number (which can happen when other threads steal + // some tasks). Note that `- 1` here is required since Chase-Lev deque needs + // one empty slot. + int64_t empty_slots = tasks_per_heap - 1; + if (n_stashed > 0) { + int64_t b = jl_atomic_load_relaxed(&wsdeques[ptls->tid].bottom); + int64_t t = jl_atomic_load_relaxed(&wsdeques[ptls->tid].top); + empty_slots -= b - t; + } + + int ntries = jl_n_threads; + if (ntries > empty_slots) + ntries = empty_slots; // because `wsdeque_push` below can't fail + for (int i = 0; i < ntries; ++i) { + uint64_t tid = cong(jl_n_threads - 1, cong_unbias, &ptls->rngseed); + if (tid >= ptls->tid) + tid++; + task = wsdeque_steal(tid); + if (task != NULL) { + if (!jl_set_task_tid(task, ptls->tid)) { + int err = wsdeque_push(task, 0); + (void)err; + assert(!err); + pushed = 1; + task = NULL; + continue; + } + break; + } + } + +done: + if (pushed) + jl_wakeup_thread(-1); + return task; +} + + +void wsdeque_gc_mark_enqueued_tasks(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp) +{ + for (int i = 0; i < jl_n_threads; ++i) { + int64_t t = jl_atomic_load_relaxed(&wsdeques[i].top); + int64_t b = jl_atomic_load_relaxed(&wsdeques[i].bottom); + for (int j = t; j < b; ++j) + jl_gc_mark_queue_obj_explicit(gc_cache, sp, (jl_value_t *)wsdeques[i].tasks[j]); + } +} + + +static int wsdeque_check_empty(void) +{ + for (int32_t i = 0; i < jl_n_threads; ++i) { + int64_t t = jl_atomic_load_relaxed(&wsdeques[i].top); + int64_t b = jl_atomic_load_relaxed(&wsdeques[i].bottom); + int64_t size = b - t; + if (size > 0) + return 0; + } + return 1; +} + + +static void wsdeque_init(void) +{ + wsdeques = (wsdeque_t *)calloc(jl_n_threads, sizeof(wsdeque_t)); + for (int32_t i = 0; i < jl_n_threads; ++i) { + wsdeques[i].tasks = (jl_task_t **)calloc(tasks_per_heap, sizeof(jl_task_t *)); + } + unbias_cong(jl_n_threads, &cong_unbias); + jl_gc_mark_enqueued_tasks = &wsdeque_gc_mark_enqueued_tasks; + partr_enqueue_task = &wsdeque_push; + partr_dequeue_task = &wsdeque_pop_or_steal; + partr_check_empty = &wsdeque_check_empty; +} + // parallel task runtime // --- @@ -236,8 +432,12 @@ static int multiq_check_empty(void) // (used only by the main thread) void jl_init_threadinginfra(void) { - /* initialize the synchronization trees pool and the multiqueue */ - multiq_init(); + /* choose and initialize the scheduler */ + char *sch = getenv("JULIA_THREAD_SCHEDULER"); + if (sch && !strncasecmp(sch, "workstealing", 12)) + wsdeque_init(); + else + multiq_init(); sleep_threshold = DEFAULT_THREAD_SLEEP_THRESHOLD; char *cp = getenv(THREAD_SLEEP_THRESHOLD_NAME); @@ -292,7 +492,7 @@ void jl_threadfun(void *arg) // enqueue the specified task for execution JL_DLLEXPORT int jl_enqueue_task(jl_task_t *task) { - if (multiq_insert(task, task->prio) == -1) + if (partr_enqueue_task(task, task->prio) == -1) return 1; return 0; } @@ -419,7 +619,7 @@ static jl_task_t *get_next_task(jl_value_t *trypoptask, jl_value_t *q) jl_set_task_tid(task, self); return task; } - return multiq_deletemin(); + return partr_dequeue_task(); } static int may_sleep(jl_ptls_t ptls) JL_NOTSAFEPOINT @@ -444,7 +644,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q) // quick, race-y check to see if there seems to be any stuff in there jl_cpu_pause(); - if (!multiq_check_empty()) { + if (!partr_check_empty()) { start_cycles = 0; continue; } @@ -453,7 +653,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q) jl_ptls_t ptls = ct->ptls; if (sleep_check_after_threshold(&start_cycles) || (!jl_atomic_load_relaxed(&_threadedregion) && ptls->tid == 0)) { jl_atomic_store(&ptls->sleep_check_state, sleeping); // acquire sleep-check lock - if (!multiq_check_empty()) { + if (!partr_check_empty()) { if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) jl_atomic_store(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us continue; diff --git a/test/threads.jl b/test/threads.jl index 4464c2a2c8859..66410f270c60d 100644 --- a/test/threads.jl +++ b/test/threads.jl @@ -24,9 +24,12 @@ let lk = ReentrantLock() end let cmd = `$(Base.julia_cmd()) --depwarn=error --rr-detach --startup-file=no threads_exec.jl` - for test_nthreads in (1, 2, 4, 4) # run once to try single-threaded mode, then try a couple times to trigger bad races + for test_nthreads in (1, 2, 4, 4), # run once to try single-threaded mode, then try a couple times to trigger bad races + sch in ("depthfirst", "workstealing") + new_env = copy(ENV) new_env["JULIA_NUM_THREADS"] = string(test_nthreads) + new_env["JULIA_THREAD_SCHEDULER"] = sch run(pipeline(setenv(cmd, new_env), stdout = stdout, stderr = stderr)) end end From f2b47d251436471aba6e3c2622f6862065aac352 Mon Sep 17 00:00:00 2001 From: Takafumi Arakaki Date: Wed, 8 Dec 2021 02:50:14 -0500 Subject: [PATCH 02/13] DEBUG: Default to workstealing for testing --- src/partr.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/partr.c b/src/partr.c index 3abbe5edf588a..31e8f25eefc8a 100644 --- a/src/partr.c +++ b/src/partr.c @@ -434,7 +434,7 @@ void jl_init_threadinginfra(void) { /* choose and initialize the scheduler */ char *sch = getenv("JULIA_THREAD_SCHEDULER"); - if (sch && !strncasecmp(sch, "workstealing", 12)) + if (!sch || !strncasecmp(sch, "workstealing", 12)) wsdeque_init(); else multiq_init(); From b702ba9033debccc74b5b5bf6e11150c6cefc6c4 Mon Sep 17 00:00:00 2001 From: Takafumi Arakaki Date: Wed, 8 Dec 2021 14:59:34 -0500 Subject: [PATCH 03/13] Pad and align wsdeques --- src/partr.c | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/src/partr.c b/src/partr.c index 31e8f25eefc8a..4182c2fa41486 100644 --- a/src/partr.c +++ b/src/partr.c @@ -14,7 +14,6 @@ extern "C" { #endif - // thread sleep state // default to DEFAULT_THREAD_SLEEP_THRESHOLD; set via $JULIA_THREAD_SLEEP_THRESHOLD @@ -250,10 +249,14 @@ static inline void multiq_init(void) // // TODO: Dynamic buffer resizing. typedef struct _wsdeque_t { - jl_task_t **tasks; - _Atomic(int64_t) top; - _Atomic(int64_t) bottom; - // TODO: pad + union { + struct { + jl_task_t **tasks; + _Atomic(int64_t) top; + _Atomic(int64_t) bottom; + }; + uint8_t padding[JL_CACHE_BYTE_ALIGNMENT]; + }; } wsdeque_t; static wsdeque_t *wsdeques; @@ -413,7 +416,11 @@ static int wsdeque_check_empty(void) static void wsdeque_init(void) { - wsdeques = (wsdeque_t *)calloc(jl_n_threads, sizeof(wsdeque_t)); + // Manually align the pointer since `jl_malloc_aligned` is not available here. + wsdeques = (wsdeque_t *)(((uintptr_t)calloc(1, sizeof(wsdeque_t) * jl_n_threads + + JL_CACHE_BYTE_ALIGNMENT - 1) + + JL_CACHE_BYTE_ALIGNMENT - 1) & + (-JL_CACHE_BYTE_ALIGNMENT)); for (int32_t i = 0; i < jl_n_threads; ++i) { wsdeques[i].tasks = (jl_task_t **)calloc(tasks_per_heap, sizeof(jl_task_t *)); } From a730ab1bc5a11dac5ffb549d96a3057db1a3ac99 Mon Sep 17 00:00:00 2001 From: Takafumi Arakaki Date: Wed, 8 Dec 2021 15:53:40 -0500 Subject: [PATCH 04/13] Use `jl_atomic_cmpswap` instead of C11 API --- src/partr.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/partr.c b/src/partr.c index 4182c2fa41486..37b1554eafc40 100644 --- a/src/partr.c +++ b/src/partr.c @@ -294,7 +294,7 @@ static jl_task_t *wsdeque_pop(void) (_Atomic(jl_task_t **))&wsdeques[tid].tasks[b % tasks_per_heap]); if (size > 0) return task; - if (!atomic_compare_exchange_strong_explicit(&wsdeques[tid].top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed)) + if (!jl_atomic_cmpswap(&wsdeques[tid].top, &t, t + 1)) task = NULL; jl_atomic_store_relaxed(&wsdeques[tid].bottom, b + 1); return task; @@ -311,8 +311,7 @@ static jl_task_t *wsdeque_steal(int16_t tid) return NULL; jl_task_t *task = jl_atomic_load_relaxed( (_Atomic(jl_task_t **))&wsdeques[tid].tasks[t % tasks_per_heap]); - if (!atomic_compare_exchange_strong_explicit( - &wsdeques[tid].top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed)) + if (!jl_atomic_cmpswap(&wsdeques[tid].top, &t, t + 1)) return NULL; return task; } From dd7d5c2c5c47059f52151b0fdd5671c0899c9249 Mon Sep 17 00:00:00 2001 From: Takafumi Arakaki Date: Wed, 8 Dec 2021 15:55:08 -0500 Subject: [PATCH 05/13] Fix casts to _Atomic --- src/partr.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/partr.c b/src/partr.c index 37b1554eafc40..d9dbd4331896d 100644 --- a/src/partr.c +++ b/src/partr.c @@ -291,7 +291,7 @@ static jl_task_t *wsdeque_pop(void) return NULL; } jl_task_t *task = jl_atomic_load_relaxed( - (_Atomic(jl_task_t **))&wsdeques[tid].tasks[b % tasks_per_heap]); + (_Atomic(jl_task_t *) *)&wsdeques[tid].tasks[b % tasks_per_heap]); if (size > 0) return task; if (!jl_atomic_cmpswap(&wsdeques[tid].top, &t, t + 1)) @@ -310,7 +310,7 @@ static jl_task_t *wsdeque_steal(int16_t tid) if (size <= 0) return NULL; jl_task_t *task = jl_atomic_load_relaxed( - (_Atomic(jl_task_t **))&wsdeques[tid].tasks[t % tasks_per_heap]); + (_Atomic(jl_task_t *) *)&wsdeques[tid].tasks[t % tasks_per_heap]); if (!jl_atomic_cmpswap(&wsdeques[tid].top, &t, t + 1)) return NULL; return task; From 36a84362ca558d46c3de5873f31dd3fdb0a34630 Mon Sep 17 00:00:00 2001 From: Takafumi Arakaki Date: Wed, 8 Dec 2021 16:50:19 -0500 Subject: [PATCH 06/13] Fix wsdeque_gc_mark_enqueued_tasks --- src/partr.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/partr.c b/src/partr.c index d9dbd4331896d..b31a03f9d54f9 100644 --- a/src/partr.c +++ b/src/partr.c @@ -395,7 +395,8 @@ void wsdeque_gc_mark_enqueued_tasks(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_ int64_t t = jl_atomic_load_relaxed(&wsdeques[i].top); int64_t b = jl_atomic_load_relaxed(&wsdeques[i].bottom); for (int j = t; j < b; ++j) - jl_gc_mark_queue_obj_explicit(gc_cache, sp, (jl_value_t *)wsdeques[i].tasks[j]); + jl_gc_mark_queue_obj_explicit( + gc_cache, sp, (jl_value_t *)wsdeques[i].tasks[j % tasks_per_heap]); } } From b1faf76de86b7f1ebcc919b731f25c6cbea4fc1b Mon Sep 17 00:00:00 2001 From: Takafumi Arakaki Date: Wed, 8 Dec 2021 16:52:23 -0500 Subject: [PATCH 07/13] Fix another cast to _Atomic --- src/partr.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/partr.c b/src/partr.c index b31a03f9d54f9..9aaa1b19783ff 100644 --- a/src/partr.c +++ b/src/partr.c @@ -270,8 +270,8 @@ static int wsdeque_push(jl_task_t *task, int16_t priority_ignord) int64_t size = b - t; if (size >= tasks_per_heap - 1) // full return -1; - jl_atomic_store_relaxed((_Atomic(jl_task_t **))&wsdeques[tid].tasks[b % tasks_per_heap], - task); + jl_atomic_store_relaxed( + (_Atomic(jl_task_t *) *)&wsdeques[tid].tasks[b % tasks_per_heap], task); jl_fence_release(); jl_atomic_store_relaxed(&wsdeques[tid].bottom, b + 1); return 0; From f94ca11b57cd8599b116ef2fd177fdbec60ecf7b Mon Sep 17 00:00:00 2001 From: Takafumi Arakaki Date: Thu, 9 Dec 2021 17:45:46 -0500 Subject: [PATCH 08/13] Mention JULIA_THREAD_SCHEDULER in documentation --- doc/src/manual/environment-variables.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/doc/src/manual/environment-variables.md b/doc/src/manual/environment-variables.md index 08d00f3a8cae4..e413308f10ce2 100644 --- a/doc/src/manual/environment-variables.md +++ b/doc/src/manual/environment-variables.md @@ -235,6 +235,16 @@ If set to anything besides `0`, then Julia's thread policy is consistent with running on a dedicated machine: the master thread is on proc 0, and threads are affinitized. Otherwise, Julia lets the operating system handle thread policy. +### `JULIA_THREAD_SCHEDULER` + +If set to `workstealing` (case insensitive), Julia uses the work-stealing +scheduler for scheduling thread-enabled tasks (e.g., tasks created by +`Threads.@spawn`). This is only an advisory mechanism and the interface may be +changed or removed in the future. + +!!! compat "Julia 1.8" + `JULIA_THREAD_SCHEDULER` requires at least Julia 1.8. + ## REPL formatting Environment variables that determine how REPL output should be formatted at the From ecec557ae6c0faba3efc53dd848b8e8c4f7cc1f6 Mon Sep 17 00:00:00 2001 From: Takafumi Arakaki Date: Thu, 9 Dec 2021 17:49:53 -0500 Subject: [PATCH 09/13] Mention JULIA_THREAD_SCHEDULER in NEWS --- NEWS.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/NEWS.md b/NEWS.md index 408dce72dd322..2d3fd96ecd858 100644 --- a/NEWS.md +++ b/NEWS.md @@ -36,6 +36,8 @@ Compiler/Runtime improvements `libjulia-codegen`. It is loaded by default, so normal usage should see no changes. In deployments that do not need the compiler (e.g. system images where all needed code is precompiled), this library (and its LLVM dependency) can simply be excluded ([#41936]). +* The environment variable `JULIA_THREAD_SCHEDULER` can be set to `workstealing` to use the + alternative work-stealing scheduler or threaded tasks ([#43366]). Command-line option changes --------------------------- From a4a20cbb0d4fee90ca6061da98952b8206c4d523 Mon Sep 17 00:00:00 2001 From: Takafumi Arakaki Date: Thu, 9 Dec 2021 17:53:16 -0500 Subject: [PATCH 10/13] Revert "DEBUG: Default to workstealing for testing" This reverts commit f2b47d251436471aba6e3c2622f6862065aac352. --- src/partr.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/partr.c b/src/partr.c index 9aaa1b19783ff..301f6bd21ac26 100644 --- a/src/partr.c +++ b/src/partr.c @@ -441,7 +441,7 @@ void jl_init_threadinginfra(void) { /* choose and initialize the scheduler */ char *sch = getenv("JULIA_THREAD_SCHEDULER"); - if (!sch || !strncasecmp(sch, "workstealing", 12)) + if (sch && !strncasecmp(sch, "workstealing", 12)) wsdeque_init(); else multiq_init(); From e86241111340ad8d767d55717a58acf02b2e3c32 Mon Sep 17 00:00:00 2001 From: Takafumi Arakaki Date: Wed, 8 Dec 2021 02:50:14 -0500 Subject: [PATCH 11/13] DEBUG: Default to workstealing for testing --- src/partr.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/partr.c b/src/partr.c index 301f6bd21ac26..9aaa1b19783ff 100644 --- a/src/partr.c +++ b/src/partr.c @@ -441,7 +441,7 @@ void jl_init_threadinginfra(void) { /* choose and initialize the scheduler */ char *sch = getenv("JULIA_THREAD_SCHEDULER"); - if (sch && !strncasecmp(sch, "workstealing", 12)) + if (!sch || !strncasecmp(sch, "workstealing", 12)) wsdeque_init(); else multiq_init(); From 531e4792f154f1927377fe7708382e7ab20a2ba5 Mon Sep 17 00:00:00 2001 From: Takafumi Arakaki Date: Fri, 10 Dec 2021 20:13:45 -0500 Subject: [PATCH 12/13] Put "straggler" task in sticky queue --- src/partr.c | 68 ++++++++--------------------------------------------- 1 file changed, 10 insertions(+), 58 deletions(-) diff --git a/src/partr.c b/src/partr.c index 9aaa1b19783ff..2fd16e1c93362 100644 --- a/src/partr.c +++ b/src/partr.c @@ -272,6 +272,10 @@ static int wsdeque_push(jl_task_t *task, int16_t priority_ignord) return -1; jl_atomic_store_relaxed( (_Atomic(jl_task_t *) *)&wsdeques[tid].tasks[b % tasks_per_heap], task); + if (jl_atomic_load_acquire(&task->tid) != -1) + // If the `task` still hasn't finished the context switch at this point, abort push + // and put it in the sticky queue. + return -1; jl_fence_release(); jl_atomic_store_relaxed(&wsdeques[tid].bottom, b + 1); return 0; @@ -317,75 +321,23 @@ static jl_task_t *wsdeque_steal(int16_t tid) } -static const int wsdeque_pop_stash = 16; - - static jl_task_t *wsdeque_pop_or_steal(void) { jl_ptls_t ptls = jl_current_task->ptls; - jl_task_t *task = NULL; - - // Try pop and lock the top `wsdeque_pop_stash` tasks in the local deque. - jl_task_t *stash[wsdeque_pop_stash]; - int n_stashed = 0; - for (; n_stashed < wsdeque_pop_stash; n_stashed++) { - task = wsdeque_pop(); - if (task != NULL) - if (!jl_set_task_tid(task, ptls->tid)) { - stash[n_stashed] = task; - task = NULL; - continue; - } - break; - } - // Put back stashed tasks in the original order; TODO: batch insert? - for (int i = n_stashed - 1; i >= 0; i--) { - int err = wsdeque_push(stash[i], 0); - (void)err; - assert(!err); - } - int pushed = n_stashed; - if (task) - goto done; - if (jl_n_threads < 2) - goto done; - - // Compute the lower bound of the number of empty slots. It's OK to be - // smaller than the actual number (which can happen when other threads steal - // some tasks). Note that `- 1` here is required since Chase-Lev deque needs - // one empty slot. - int64_t empty_slots = tasks_per_heap - 1; - if (n_stashed > 0) { - int64_t b = jl_atomic_load_relaxed(&wsdeques[ptls->tid].bottom); - int64_t t = jl_atomic_load_relaxed(&wsdeques[ptls->tid].top); - empty_slots -= b - t; - } + jl_task_t *task = wsdeque_pop(); + if (task || jl_n_threads < 2) + return task; int ntries = jl_n_threads; - if (ntries > empty_slots) - ntries = empty_slots; // because `wsdeque_push` below can't fail for (int i = 0; i < ntries; ++i) { uint64_t tid = cong(jl_n_threads - 1, cong_unbias, &ptls->rngseed); if (tid >= ptls->tid) tid++; task = wsdeque_steal(tid); - if (task != NULL) { - if (!jl_set_task_tid(task, ptls->tid)) { - int err = wsdeque_push(task, 0); - (void)err; - assert(!err); - pushed = 1; - task = NULL; - continue; - } - break; - } + if (task) + return task; } - -done: - if (pushed) - jl_wakeup_thread(-1); - return task; + return NULL; } From 0e50d013553f0ff54f41ac305be57cf29660cb99 Mon Sep 17 00:00:00 2001 From: Takafumi Arakaki Date: Sat, 11 Dec 2021 00:48:00 -0500 Subject: [PATCH 13/13] Revert "DEBUG: Default to workstealing for testing" This reverts commit f2b47d251436471aba6e3c2622f6862065aac352. --- src/partr.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/partr.c b/src/partr.c index 2fd16e1c93362..b687ca7c8e0ea 100644 --- a/src/partr.c +++ b/src/partr.c @@ -393,7 +393,7 @@ void jl_init_threadinginfra(void) { /* choose and initialize the scheduler */ char *sch = getenv("JULIA_THREAD_SCHEDULER"); - if (!sch || !strncasecmp(sch, "workstealing", 12)) + if (sch && !strncasecmp(sch, "workstealing", 12)) wsdeque_init(); else multiq_init();