Skip to content

Commit

Permalink
Put "straggler" task in sticky queue
Browse files Browse the repository at this point in the history
  • Loading branch information
tkf committed Dec 11, 2021
1 parent e862411 commit 531e479
Showing 1 changed file with 10 additions and 58 deletions.
68 changes: 10 additions & 58 deletions src/partr.c
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,10 @@ static int wsdeque_push(jl_task_t *task, int16_t priority_ignord)
return -1;
jl_atomic_store_relaxed(
(_Atomic(jl_task_t *) *)&wsdeques[tid].tasks[b % tasks_per_heap], task);
if (jl_atomic_load_acquire(&task->tid) != -1)
// If the `task` still hasn't finished the context switch at this point, abort push
// and put it in the sticky queue.
return -1;
jl_fence_release();
jl_atomic_store_relaxed(&wsdeques[tid].bottom, b + 1);
return 0;
Expand Down Expand Up @@ -317,75 +321,23 @@ static jl_task_t *wsdeque_steal(int16_t tid)
}


static const int wsdeque_pop_stash = 16;


static jl_task_t *wsdeque_pop_or_steal(void)
{
jl_ptls_t ptls = jl_current_task->ptls;
jl_task_t *task = NULL;

// Try pop and lock the top `wsdeque_pop_stash` tasks in the local deque.
jl_task_t *stash[wsdeque_pop_stash];
int n_stashed = 0;
for (; n_stashed < wsdeque_pop_stash; n_stashed++) {
task = wsdeque_pop();
if (task != NULL)
if (!jl_set_task_tid(task, ptls->tid)) {
stash[n_stashed] = task;
task = NULL;
continue;
}
break;
}
// Put back stashed tasks in the original order; TODO: batch insert?
for (int i = n_stashed - 1; i >= 0; i--) {
int err = wsdeque_push(stash[i], 0);
(void)err;
assert(!err);
}
int pushed = n_stashed;
if (task)
goto done;
if (jl_n_threads < 2)
goto done;

// Compute the lower bound of the number of empty slots. It's OK to be
// smaller than the actual number (which can happen when other threads steal
// some tasks). Note that `- 1` here is required since Chase-Lev deque needs
// one empty slot.
int64_t empty_slots = tasks_per_heap - 1;
if (n_stashed > 0) {
int64_t b = jl_atomic_load_relaxed(&wsdeques[ptls->tid].bottom);
int64_t t = jl_atomic_load_relaxed(&wsdeques[ptls->tid].top);
empty_slots -= b - t;
}
jl_task_t *task = wsdeque_pop();
if (task || jl_n_threads < 2)
return task;

int ntries = jl_n_threads;
if (ntries > empty_slots)
ntries = empty_slots; // because `wsdeque_push` below can't fail
for (int i = 0; i < ntries; ++i) {
uint64_t tid = cong(jl_n_threads - 1, cong_unbias, &ptls->rngseed);
if (tid >= ptls->tid)
tid++;
task = wsdeque_steal(tid);
if (task != NULL) {
if (!jl_set_task_tid(task, ptls->tid)) {
int err = wsdeque_push(task, 0);
(void)err;
assert(!err);
pushed = 1;
task = NULL;
continue;
}
break;
}
if (task)
return task;
}

done:
if (pushed)
jl_wakeup_thread(-1);
return task;
return NULL;
}


Expand Down

0 comments on commit 531e479

Please sign in to comment.