From 531e4792f154f1927377fe7708382e7ab20a2ba5 Mon Sep 17 00:00:00 2001 From: Takafumi Arakaki Date: Fri, 10 Dec 2021 20:13:45 -0500 Subject: [PATCH] Put "straggler" task in sticky queue --- src/partr.c | 68 ++++++++--------------------------------------------- 1 file changed, 10 insertions(+), 58 deletions(-) diff --git a/src/partr.c b/src/partr.c index 9aaa1b19783ff..2fd16e1c93362 100644 --- a/src/partr.c +++ b/src/partr.c @@ -272,6 +272,10 @@ static int wsdeque_push(jl_task_t *task, int16_t priority_ignord) return -1; jl_atomic_store_relaxed( (_Atomic(jl_task_t *) *)&wsdeques[tid].tasks[b % tasks_per_heap], task); + if (jl_atomic_load_acquire(&task->tid) != -1) + // If the `task` still hasn't finished the context switch at this point, abort push + // and put it in the sticky queue. + return -1; jl_fence_release(); jl_atomic_store_relaxed(&wsdeques[tid].bottom, b + 1); return 0; @@ -317,75 +321,23 @@ static jl_task_t *wsdeque_steal(int16_t tid) } -static const int wsdeque_pop_stash = 16; - - static jl_task_t *wsdeque_pop_or_steal(void) { jl_ptls_t ptls = jl_current_task->ptls; - jl_task_t *task = NULL; - - // Try pop and lock the top `wsdeque_pop_stash` tasks in the local deque. - jl_task_t *stash[wsdeque_pop_stash]; - int n_stashed = 0; - for (; n_stashed < wsdeque_pop_stash; n_stashed++) { - task = wsdeque_pop(); - if (task != NULL) - if (!jl_set_task_tid(task, ptls->tid)) { - stash[n_stashed] = task; - task = NULL; - continue; - } - break; - } - // Put back stashed tasks in the original order; TODO: batch insert? - for (int i = n_stashed - 1; i >= 0; i--) { - int err = wsdeque_push(stash[i], 0); - (void)err; - assert(!err); - } - int pushed = n_stashed; - if (task) - goto done; - if (jl_n_threads < 2) - goto done; - - // Compute the lower bound of the number of empty slots. It's OK to be - // smaller than the actual number (which can happen when other threads steal - // some tasks). Note that `- 1` here is required since Chase-Lev deque needs - // one empty slot. - int64_t empty_slots = tasks_per_heap - 1; - if (n_stashed > 0) { - int64_t b = jl_atomic_load_relaxed(&wsdeques[ptls->tid].bottom); - int64_t t = jl_atomic_load_relaxed(&wsdeques[ptls->tid].top); - empty_slots -= b - t; - } + jl_task_t *task = wsdeque_pop(); + if (task || jl_n_threads < 2) + return task; int ntries = jl_n_threads; - if (ntries > empty_slots) - ntries = empty_slots; // because `wsdeque_push` below can't fail for (int i = 0; i < ntries; ++i) { uint64_t tid = cong(jl_n_threads - 1, cong_unbias, &ptls->rngseed); if (tid >= ptls->tid) tid++; task = wsdeque_steal(tid); - if (task != NULL) { - if (!jl_set_task_tid(task, ptls->tid)) { - int err = wsdeque_push(task, 0); - (void)err; - assert(!err); - pushed = 1; - task = NULL; - continue; - } - break; - } + if (task) + return task; } - -done: - if (pushed) - jl_wakeup_thread(-1); - return task; + return NULL; }