Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Work-stealing scheduler #43366

Closed
wants to merge 14 commits into from
2 changes: 1 addition & 1 deletion src/gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -2787,7 +2787,7 @@ static void jl_gc_queue_thread_local(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp
gc_mark_queue_obj(gc_cache, sp, ptls2->previous_exception);
}

void jl_gc_mark_enqueued_tasks(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp);
extern void (*jl_gc_mark_enqueued_tasks)(jl_gc_mark_cache_t *, jl_gc_mark_sp_t *);
extern jl_value_t *cmpswap_names JL_GLOBALLY_ROOTED;

// mark the initial root set
Expand Down
242 changes: 221 additions & 21 deletions src/partr.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ JL_DLLEXPORT int jl_set_task_tid(jl_task_t *task, int tid) JL_NOTSAFEPOINT
extern int jl_gc_mark_queue_obj_explicit(jl_gc_mark_cache_t *gc_cache,
jl_gc_mark_sp_t *sp, jl_value_t *obj) JL_NOTSAFEPOINT;

// partr dynamic dispatch
void (*jl_gc_mark_enqueued_tasks)(jl_gc_mark_cache_t *, jl_gc_mark_sp_t *);
static int (*partr_enqueue_task)(jl_task_t *, int16_t);
static jl_task_t *(*partr_dequeue_task)(void);
static int (*partr_check_empty)(void);

// multiq
// ---

Expand All @@ -83,20 +89,6 @@ static int32_t heap_p;
static uint64_t cong_unbias;


static inline void multiq_init(void)
{
heap_p = heap_c * jl_n_threads;
heaps = (taskheap_t *)calloc(heap_p, sizeof(taskheap_t));
for (int32_t i = 0; i < heap_p; ++i) {
uv_mutex_init(&heaps[i].lock);
heaps[i].tasks = (jl_task_t **)calloc(tasks_per_heap, sizeof(jl_task_t*));
jl_atomic_store_relaxed(&heaps[i].ntasks, 0);
jl_atomic_store_relaxed(&heaps[i].prio, INT16_MAX);
}
unbias_cong(heap_p, &cong_unbias);
}


static inline void sift_up(taskheap_t *heap, int32_t idx)
{
if (idx > 0) {
Expand Down Expand Up @@ -208,7 +200,7 @@ static inline jl_task_t *multiq_deletemin(void)
}


void jl_gc_mark_enqueued_tasks(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp)
void multiq_gc_mark_enqueued_tasks(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp)
{
int32_t i, j;
for (i = 0; i < heap_p; ++i)
Expand All @@ -228,6 +220,210 @@ static int multiq_check_empty(void)
}


static inline void multiq_init(void)
{
heap_p = heap_c * jl_n_threads;
heaps = (taskheap_t *)calloc(heap_p, sizeof(taskheap_t));
for (int32_t i = 0; i < heap_p; ++i) {
uv_mutex_init(&heaps[i].lock);
heaps[i].tasks = (jl_task_t **)calloc(tasks_per_heap, sizeof(jl_task_t*));
jl_atomic_store_relaxed(&heaps[i].ntasks, 0);
jl_atomic_store_relaxed(&heaps[i].prio, INT16_MAX);
}
unbias_cong(heap_p, &cong_unbias);
jl_gc_mark_enqueued_tasks = &multiq_gc_mark_enqueued_tasks;
partr_enqueue_task = &multiq_insert;
partr_dequeue_task = &multiq_deletemin;
partr_check_empty = &multiq_check_empty;
}



// work-stealing deque

// The work-stealing deque by Chase and Lev (2005). Le et al. (2013) provides
// C11-complienet memory ordering.
//
// Ref:
// * Chase and Lev (2005) https://doi.org/10.1145/1073970.1073974
// * Le et al. (2013) https://doi.org/10.1145/2442516.2442524
//
// TODO: Dynamic buffer resizing.
typedef struct _wsdeque_t {
jl_task_t **tasks;
_Atomic(int64_t) top;
_Atomic(int64_t) bottom;
// TODO: pad
} wsdeque_t;

static wsdeque_t *wsdeques;


static int wsdeque_push(jl_task_t *task, int16_t priority_ignord)
{
int16_t tid = jl_threadid();
int64_t b = jl_atomic_load_relaxed(&wsdeques[tid].bottom);
int64_t t = jl_atomic_load_acquire(&wsdeques[tid].top);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is being load-acquired? There are no stores in wsdeque_steal

Suggested change
int64_t t = jl_atomic_load_acquire(&wsdeques[tid].top);
int64_t t = jl_atomic_load(&wsdeques[tid].top); // ensures that `tasks` is successfully stolen before we try to reuse the slot below

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to digest this and I realized that I still don't get it. As we discussed, there is a CAS on .top in wsdeque_steal

julia/src/partr.c

Lines 316 to 318 in 0e50d01

jl_task_t *task = jl_atomic_load_relaxed(
(_Atomic(jl_task_t *) *)&wsdeques[tid].tasks[t % tasks_per_heap]);
if (!jl_atomic_cmpswap(&wsdeques[tid].top, &t, t + 1))

The loaded task only matters in the success path where we have a seq_cst write that supersets release write. So, we have:

  • a sequenced-before edge from the load .tasks[t % tasks_per_heap] to the store on the .top via CAS in the next line (as they both happens in the same thread)
  • a synchronizes-with edge from the store on the .top to the load of .top in wsdeque_push you quoted above

So they establish happens-before and it looks like we know that the task is loaded by the time we load the .top? IIRC, your concern was that loading .tasks[t % tasks_per_heap] and storing .top can be reordered, but doesn't release forbids that?

A store operation with this memory order performs the release operation: no reads or writes in the current thread can be reordered after this store.
https://en.cppreference.com/w/cpp/atomic/memory_order

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure. But edges are not transitive, unless all of them are seq-cst, IIUC.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But yes, looks like we have a proper release/acquire pair here on top to ensure the ops on tasks are okay.

Copy link
Member Author

@tkf tkf Jan 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.... Lahav et al.'s definition of happens-before hb is the transitive closure of the union of sequenced-before sb and synchronizes-with sw

image

R^+ is the transitive closure of R:

image

But I don't know if that's a mismatch between their definition and the C11 semantics (though they use the same notions for discussing C11 so probably not) and/or some difference to the actual definition in the standard.

int64_t size = b - t;
if (size >= tasks_per_heap - 1) // full
return -1;
jl_atomic_store_relaxed((_Atomic(jl_task_t **))&wsdeques[tid].tasks[b % tasks_per_heap],
task);
jl_fence_release();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only store was the relaxed to tasks, so maybe just make that a release store?

jl_atomic_store_relaxed(&wsdeques[tid].bottom, b + 1);
return 0;
}


static jl_task_t *wsdeque_pop(void)
{
int16_t tid = jl_threadid();
int64_t b = jl_atomic_load_relaxed(&wsdeques[tid].bottom) - 1;
jl_atomic_store_relaxed(&wsdeques[tid].bottom, b);
jl_fence();
int64_t t = jl_atomic_load_relaxed(&wsdeques[tid].top);
int64_t size = b - t;
if (size < 0) {
jl_atomic_store_relaxed(&wsdeques[tid].bottom, t);
return NULL;
}
jl_task_t *task = jl_atomic_load_relaxed(
(_Atomic(jl_task_t **))&wsdeques[tid].tasks[b % tasks_per_heap]);
if (size > 0)
return task;
if (!atomic_compare_exchange_strong_explicit(&wsdeques[tid].top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed))
task = NULL;
jl_atomic_store_relaxed(&wsdeques[tid].bottom, b + 1);
return task;
}


static jl_task_t *wsdeque_steal(int16_t tid)
{
int64_t t = jl_atomic_load_acquire(&wsdeques[tid].top);
jl_fence();
int64_t b = jl_atomic_load_acquire(&wsdeques[tid].bottom);
int64_t size = b - t;
if (size <= 0)
return NULL;
jl_task_t *task = jl_atomic_load_relaxed(
(_Atomic(jl_task_t **))&wsdeques[tid].tasks[t % tasks_per_heap]);
if (!atomic_compare_exchange_strong_explicit(
&wsdeques[tid].top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this shrink the workqueue from the top? And thus everytime we steal we lose space ontop?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It uses the .tasks buffer as a circular array so it's OK that the .top is never decremented. (In fact, that's the main cleverness of the Chase-Lev deque.)

return NULL;
return task;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return task;
if (jl_set_task_tid(task))
return task;
wsdequeue_push(task, 1); // FIXME: the sticky queue would be a better place for this, but we need to handle that inside Julia instead of handling these conditions here
return NULL;

}


static const int wsdeque_pop_stash = 16;


static jl_task_t *wsdeque_pop_or_steal(void)
{
jl_ptls_t ptls = jl_current_task->ptls;
jl_task_t *task = NULL;

// Try pop and lock the top `wsdeque_pop_stash` tasks in the local deque.
jl_task_t *stash[wsdeque_pop_stash];
int n_stashed = 0;
for (; n_stashed < wsdeque_pop_stash; n_stashed++) {
task = wsdeque_pop();
if (task != NULL)
if (!jl_set_task_tid(task, ptls->tid)) {
stash[n_stashed] = task;
task = NULL;
continue;
}
break;
}
// Put back stashed tasks in the original order; TODO: batch insert?
for (int i = n_stashed - 1; i >= 0; i--) {
int err = wsdeque_push(stash[i], 0);
(void)err;
assert(!err);
}
int pushed = n_stashed;
if (task)
goto done;
if (jl_n_threads < 2)
goto done;

// Compute the lower bound of the number of empty slots. It's OK to be
// smaller than the actual number (which can happen when other threads steal
// some tasks). Note that `- 1` here is required since Chase-Lev deque needs
// one empty slot.
int64_t empty_slots = tasks_per_heap - 1;
if (n_stashed > 0) {
int64_t b = jl_atomic_load_relaxed(&wsdeques[ptls->tid].bottom);
int64_t t = jl_atomic_load_relaxed(&wsdeques[ptls->tid].top);
empty_slots -= b - t;
}

int ntries = jl_n_threads;
if (ntries > empty_slots)
ntries = empty_slots; // because `wsdeque_push` below can't fail
for (int i = 0; i < ntries; ++i) {
uint64_t tid = cong(jl_n_threads - 1, cong_unbias, &ptls->rngseed);
if (tid >= ptls->tid)
tid++;
task = wsdeque_steal(tid);
if (task != NULL) {
if (!jl_set_task_tid(task, ptls->tid)) {
int err = wsdeque_push(task, 0);
(void)err;
assert(!err);
pushed = 1;
task = NULL;
continue;
}
break;
}
}

done:
if (pushed)
jl_wakeup_thread(-1);
return task;
}


void wsdeque_gc_mark_enqueued_tasks(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp)
{
for (int i = 0; i < jl_n_threads; ++i) {
int64_t t = jl_atomic_load_relaxed(&wsdeques[i].top);
int64_t b = jl_atomic_load_relaxed(&wsdeques[i].bottom);
for (int j = t; j < b; ++j)
jl_gc_mark_queue_obj_explicit(gc_cache, sp, (jl_value_t *)wsdeques[i].tasks[j]);
}
}


static int wsdeque_check_empty(void)
{
for (int32_t i = 0; i < jl_n_threads; ++i) {
int64_t t = jl_atomic_load_relaxed(&wsdeques[i].top);
int64_t b = jl_atomic_load_relaxed(&wsdeques[i].bottom);
int64_t size = b - t;
if (size > 0)
return 0;
}
return 1;
}


static void wsdeque_init(void)
{
wsdeques = (wsdeque_t *)calloc(jl_n_threads, sizeof(wsdeque_t));
for (int32_t i = 0; i < jl_n_threads; ++i) {
wsdeques[i].tasks = (jl_task_t **)calloc(tasks_per_heap, sizeof(jl_task_t *));
}
unbias_cong(jl_n_threads, &cong_unbias);
jl_gc_mark_enqueued_tasks = &wsdeque_gc_mark_enqueued_tasks;
partr_enqueue_task = &wsdeque_push;
partr_dequeue_task = &wsdeque_pop_or_steal;
partr_check_empty = &wsdeque_check_empty;
}


// parallel task runtime
// ---
Expand All @@ -236,8 +432,12 @@ static int multiq_check_empty(void)
// (used only by the main thread)
void jl_init_threadinginfra(void)
{
/* initialize the synchronization trees pool and the multiqueue */
multiq_init();
/* choose and initialize the scheduler */
char *sch = getenv("JULIA_THREAD_SCHEDULER");
if (!sch || !strncasecmp(sch, "workstealing", 12))
wsdeque_init();
else
multiq_init();

sleep_threshold = DEFAULT_THREAD_SLEEP_THRESHOLD;
char *cp = getenv(THREAD_SLEEP_THRESHOLD_NAME);
Expand Down Expand Up @@ -292,7 +492,7 @@ void jl_threadfun(void *arg)
// enqueue the specified task for execution
JL_DLLEXPORT int jl_enqueue_task(jl_task_t *task)
{
if (multiq_insert(task, task->prio) == -1)
if (partr_enqueue_task(task, task->prio) == -1)
return 1;
return 0;
}
Expand Down Expand Up @@ -419,7 +619,7 @@ static jl_task_t *get_next_task(jl_value_t *trypoptask, jl_value_t *q)
jl_set_task_tid(task, self);
return task;
}
return multiq_deletemin();
return partr_dequeue_task();
}

static int may_sleep(jl_ptls_t ptls) JL_NOTSAFEPOINT
Expand All @@ -444,7 +644,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q)

// quick, race-y check to see if there seems to be any stuff in there
jl_cpu_pause();
if (!multiq_check_empty()) {
if (!partr_check_empty()) {
start_cycles = 0;
continue;
}
Expand All @@ -453,7 +653,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q)
jl_ptls_t ptls = ct->ptls;
if (sleep_check_after_threshold(&start_cycles) || (!jl_atomic_load_relaxed(&_threadedregion) && ptls->tid == 0)) {
jl_atomic_store(&ptls->sleep_check_state, sleeping); // acquire sleep-check lock
if (!multiq_check_empty()) {
if (!partr_check_empty()) {
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping)
jl_atomic_store(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
continue;
Expand Down
5 changes: 4 additions & 1 deletion test/threads.jl
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@ let lk = ReentrantLock()
end

let cmd = `$(Base.julia_cmd()) --depwarn=error --rr-detach --startup-file=no threads_exec.jl`
for test_nthreads in (1, 2, 4, 4) # run once to try single-threaded mode, then try a couple times to trigger bad races
for test_nthreads in (1, 2, 4, 4), # run once to try single-threaded mode, then try a couple times to trigger bad races
sch in ("depthfirst", "workstealing")

new_env = copy(ENV)
new_env["JULIA_NUM_THREADS"] = string(test_nthreads)
new_env["JULIA_THREAD_SCHEDULER"] = sch
run(pipeline(setenv(cmd, new_env), stdout = stdout, stderr = stderr))
end
end
Expand Down