Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GC/Parallel marking #44643

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
285 changes: 141 additions & 144 deletions src/gc-debug.c

Large diffs are not rendered by default.

563 changes: 356 additions & 207 deletions src/gc.c

Large diffs are not rendered by default.

53 changes: 26 additions & 27 deletions src/gc.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,28 +217,35 @@ union _jl_gc_mark_data {
gc_mark_finlist_t finlist;
};

// Pop a data struct from the mark data stack (i.e. decrease the stack pointer)
// This should be used after dispatch and therefore the pc stack pointer is already popped from
// the stack.
STATIC_INLINE void *gc_pop_markdata_(jl_gc_mark_sp_t *sp, size_t size)
// Return a pointer to the bottom of the data queue
STATIC_INLINE void *gc_mark_deque_data_bottom(jl_gc_ws_queue_t *mark_queue) JL_NOTSAFEPOINT
{
jl_gc_mark_data_t *data = (jl_gc_mark_data_t *)(((char*)sp->data) - size);
sp->data = data;
return data;
jl_gc_ws_bottom_t bottom = jl_atomic_load_relaxed(&mark_queue->bottom);
jl_gc_ws_array_t *array = jl_atomic_load_relaxed(&mark_queue->array);
return &array->data_start[bottom.data_offset % array->size];
}
#define gc_pop_markdata(sp, type) ((type*)gc_pop_markdata_(sp, sizeof(type)))

// Re-push a frame to the mark stack (both data and pc)
// The data and pc are expected to be on the stack (or updated in place) already.
// Re-push a frame to the mark queue (both data and pc)
// The data and pc are expected to be on the queue (or updated in place) already.
// Mainly useful to pause the current scanning in order to scan an new object.
STATIC_INLINE void *gc_repush_markdata_(jl_gc_mark_sp_t *sp, size_t size) JL_NOTSAFEPOINT
STATIC_INLINE void *gc_mark_deque_repush(jl_gc_ws_queue_t *mark_queue) JL_NOTSAFEPOINT
{
jl_gc_mark_data_t *data = sp->data;
sp->pc++;
sp->data = (jl_gc_mark_data_t *)(((char*)sp->data) + size);
jl_gc_ws_bottom_t bottom = jl_atomic_load_relaxed(&mark_queue->bottom);
jl_gc_ws_array_t *array = jl_atomic_load_relaxed(&mark_queue->array);
jl_gc_mark_data_t *data = &array->data_start[bottom.data_offset % array->size];
bottom.pc_offset++;
bottom.data_offset++;
jl_atomic_store_relaxed(&mark_queue->bottom, bottom);
return data;
}
#define gc_repush_markdata(sp, type) ((type*)gc_repush_markdata_(sp, sizeof(type)))

// Used to determine whether the bottom of pc/data queue should be incremented
// on a push
typedef enum {
no_inc,
inc,
inc_data_only
} jl_gc_push_mode_t;

// layout for big (>2k) objects

Expand Down Expand Up @@ -505,18 +512,10 @@ STATIC_INLINE void gc_big_object_link(bigval_t *hdr, bigval_t **list) JL_NOTSAFE
*list = hdr;
}

STATIC_INLINE void gc_mark_sp_init(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp)
{
sp->pc = gc_cache->pc_stack;
sp->data = gc_cache->data_stack;
sp->pc_start = gc_cache->pc_stack;
sp->pc_end = gc_cache->pc_stack_end;
}

void gc_mark_queue_all_roots(jl_ptls_t ptls, jl_gc_mark_sp_t *sp);
void gc_mark_queue_finlist(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp,
void gc_mark_queue_all_roots(jl_ptls_t ptls);
void gc_mark_queue_finlist(jl_gc_mark_cache_t *gc_cache,
arraylist_t *list, size_t start);
void gc_mark_loop(jl_ptls_t ptls, jl_gc_mark_sp_t sp);
void gc_mark_loop(jl_ptls_t ptls);
void sweep_stack_pools(void);
void jl_gc_debug_init(void);

Expand Down Expand Up @@ -648,7 +647,7 @@ extern int gc_verifying;
#endif
int gc_slot_to_fieldidx(void *_obj, void *slot);
int gc_slot_to_arrayidx(void *_obj, void *begin);
NOINLINE void gc_mark_loop_unwind(jl_ptls_t ptls, jl_gc_mark_sp_t sp, int pc_offset);
// NOINLINE void gc_mark_loop_unwind(jl_ptls_t ptls, int pc_offset);

#ifdef GC_DEBUG_ENV
JL_DLLEXPORT extern jl_gc_debug_env_t jl_gc_debug_env;
Expand Down
4 changes: 3 additions & 1 deletion src/julia_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ typedef jl_gcframe_t ***(*jl_pgcstack_key_t)(void) JL_NOTSAFEPOINT;
#endif
JL_DLLEXPORT void jl_pgcstack_getkey(jl_get_pgcstack_func **f, jl_pgcstack_key_t *k);

#if !defined(__clang_gcanalyzer__) && !defined(_OS_DARWIN_)
#if !defined(__clang_gcanalyzer__)
static inline void jl_set_gc_and_wait(void)
{
jl_task_t *ct = jl_current_task;
Expand Down Expand Up @@ -1570,6 +1570,8 @@ JL_DLLEXPORT uint16_t julia__truncdfhf2(double param) JL_NOTSAFEPOINT;
#define JL_PROBE_GC_STOP_THE_WORLD() do ; while (0)
#define JL_PROBE_GC_MARK_BEGIN() do ; while (0)
#define JL_PROBE_GC_MARK_END(scanned_bytes, perm_scanned_bytes) do ; while (0)
#define JL_PROBE_GC_MARK_STOP_THE_WORLD_SWEEP_BEGIN() do; while(0)
#define JL_PROBE_GC_MARK_STOP_THE_WORLD_SWEEP_END() do; while(0)
#define JL_PROBE_GC_SWEEP_BEGIN(full) do ; while (0)
#define JL_PROBE_GC_SWEEP_END() do ; while (0)
#define JL_PROBE_GC_END() do ; while (0)
Expand Down
45 changes: 36 additions & 9 deletions src/julia_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,25 @@ typedef struct {
typedef union _jl_gc_mark_data jl_gc_mark_data_t;

typedef struct {
void **pc; // Current stack address for the pc (up growing)
jl_gc_mark_data_t *data; // Current stack address for the data (up growing)
void **pc_start; // Cached value of `gc_cache->pc_stack`
void **pc_end; // Cached value of `gc_cache->pc_stack_end`
} jl_gc_mark_sp_t;
int32_t offset, version;
} jl_gc_ws_top_t;

typedef struct {
int32_t pc_offset, data_offset;
} jl_gc_ws_bottom_t;

typedef struct {
void **pc_start;
jl_gc_mark_data_t *data_start;
size_t size;
} jl_gc_ws_array_t;

typedef struct {
_Atomic(jl_gc_ws_top_t) top;
_Atomic(jl_gc_ws_bottom_t) bottom;
_Atomic(jl_gc_ws_array_t *) array;
arraylist_t *reclaim_set;
} jl_gc_ws_queue_t;

typedef struct {
// thread local increment of `perm_scanned_bytes`
Expand All @@ -195,9 +209,7 @@ typedef struct {
// this makes sure that a single objects can only appear once in
// the lists (the mark bit cannot be flipped to `0` without sweeping)
void *big_obj[1024];
void **pc_stack;
void **pc_stack_end;
jl_gc_mark_data_t *data_stack;
jl_gc_ws_queue_t mark_queue;
} jl_gc_mark_cache_t;

struct _jl_bt_element_t;
Expand All @@ -217,6 +229,9 @@ typedef struct _jl_tls_states_t {
#define JL_GC_STATE_SAFE 2
// gc_state = 2 means the thread is running unmanaged code that can be
// execute at the same time with the GC.
#define JL_GC_STATE_PARALLEL 3
// gc_state = 3 means the thread is doing GC work that can be executed
// concurrently on multiple threads.
_Atomic(int8_t) gc_state; // read from foreign threads
// execution of certain certain impure
// statements is prohibited from certain
Expand Down Expand Up @@ -264,7 +279,6 @@ typedef struct _jl_tls_states_t {
arraylist_t finalizers;
jl_gc_mark_cache_t gc_cache;
arraylist_t sweep_objs;
jl_gc_mark_sp_t gc_mark_sp;
// Saved exception for previous *external* API call or NULL if cleared.
// Access via jl_exception_occurred().
struct _jl_value_t *previous_exception;
Expand Down Expand Up @@ -357,7 +371,20 @@ int8_t jl_gc_safe_leave(jl_ptls_t ptls, int8_t state); // Can be a safepoint
#define jl_gc_safe_enter(ptls) jl_gc_state_save_and_set(ptls, JL_GC_STATE_SAFE)
#define jl_gc_safe_leave(ptls, state) ((void)jl_gc_state_set(ptls, (state), JL_GC_STATE_SAFE))
#endif
#define jl_gc_mark_loop_enter(ptls) do { \
jl_atomic_fetch_add(&nworkers_marking, 1); \
jl_fence(); \
jl_atomic_store_release(&ptls->gc_state, JL_GC_STATE_PARALLEL); \
} while (0)
#define jl_gc_mark_loop_leave(ptls) do { \
jl_atomic_store_release(&ptls->gc_state, JL_GC_STATE_WAITING); \
jl_fence(); \
jl_atomic_fetch_add(&nworkers_marking, -1); \
} while (0)
JL_DLLEXPORT void (jl_gc_safepoint)(void);
// Either NULL, or the address of a function that threads can call while
// waiting for the GC, which will recruit them into a concurrent GC operation.
extern _Atomic(void *) jl_gc_recruiting_location;

JL_DLLEXPORT void jl_gc_enable_finalizers(struct _jl_task_t *ct, int on);
JL_DLLEXPORT void jl_gc_disable_finalizers_internal(void);
Expand Down
14 changes: 9 additions & 5 deletions src/partr.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ uint64_t io_wakeup_enter;
uint64_t io_wakeup_leave;
);

uv_mutex_t *sleep_locks;
uv_cond_t *wake_signals;
uv_mutex_t *sleep_locks, *safepoint_sleep_locks;
uv_cond_t *wake_signals, *safepoint_wake_signals;

JL_DLLEXPORT int jl_set_task_tid(jl_task_t *task, int16_t tid) JL_NOTSAFEPOINT
{
Expand All @@ -70,8 +70,8 @@ JL_DLLEXPORT int jl_set_task_threadpoolid(jl_task_t *task, int8_t tpid) JL_NOTSA
}

// GC functions used
extern int jl_gc_mark_queue_obj_explicit(jl_gc_mark_cache_t *gc_cache,
jl_gc_mark_sp_t *sp, jl_value_t *obj) JL_NOTSAFEPOINT;
extern int jl_gc_mark_queue_obj_explicit(jl_gc_mark_cache_t *gc_cache,
jl_value_t *obj) JL_NOTSAFEPOINT;

// parallel task runtime
// ---
Expand Down Expand Up @@ -99,10 +99,14 @@ void jl_init_threadinginfra(void)

int16_t tid;
sleep_locks = (uv_mutex_t*)calloc(jl_n_threads, sizeof(uv_mutex_t));
safepoint_sleep_locks = (uv_mutex_t*)calloc(jl_n_threads, sizeof(uv_mutex_t));
wake_signals = (uv_cond_t*)calloc(jl_n_threads, sizeof(uv_cond_t));
safepoint_wake_signals = (uv_cond_t*)calloc(jl_n_threads, sizeof(uv_cond_t));
for (tid = 0; tid < jl_n_threads; tid++) {
uv_mutex_init(&sleep_locks[tid]);
uv_mutex_init(&safepoint_sleep_locks[tid]);
uv_cond_init(&wake_signals[tid]);
uv_cond_init(&safepoint_wake_signals[tid]);
}
}

Expand Down Expand Up @@ -409,7 +413,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
uv_mutex_lock(&sleep_locks[ptls->tid]);
while (may_sleep(ptls)) {
uv_cond_wait(&wake_signals[ptls->tid], &sleep_locks[ptls->tid]);
// TODO: help with gc work here, if applicable
jl_safepoint_wait_gc();
}
assert(jl_atomic_load_relaxed(&ptls->sleep_check_state) == not_sleeping);
uv_mutex_unlock(&sleep_locks[ptls->tid]);
Expand Down
131 changes: 122 additions & 9 deletions src/safepoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ uint8_t jl_safepoint_enable_cnt[3] = {0, 0, 0};
// fight on the safepoint lock...
uv_mutex_t safepoint_lock;

_Atomic(void *) jl_gc_recruiting_location = NULL;
_Atomic(int32_t) jl_gc_safepoint_master = -1;
_Atomic(int32_t) nworkers_marking = 0;

extern uv_mutex_t *safepoint_sleep_locks;
extern uv_cond_t *safepoint_wake_signals;

const uint64_t timeout_ns = 500;

static void jl_safepoint_enable(int idx) JL_NOTSAFEPOINT
{
// safepoint_lock should be held
Expand Down Expand Up @@ -146,21 +155,125 @@ void jl_safepoint_end_gc(void)
jl_safepoint_disable(2);
jl_safepoint_disable(1);
jl_atomic_store_release(&jl_gc_running, 0);
# ifdef __APPLE__
// This wakes up other threads on mac.
jl_mach_gc_end();
# endif
uv_mutex_unlock(&safepoint_lock);
}

// Thread recruitment scheme inspired by Hassanein,
// `Understanding and Improving JVM GC Work Stealing at the
// Data Center Scale`

void jl_safepoint_try_recruit(jl_ptls_t ptls)
{
if (jl_atomic_load_relaxed(&jl_gc_recruiting_location)) {
jl_gc_mark_loop_enter(ptls);
void *location = jl_atomic_load_acquire(&jl_gc_recruiting_location);
if (location)
((void (*)(jl_ptls_t))location)(ptls);
jl_gc_mark_loop_leave(ptls);
}
}

size_t jl_safepoint_master_count_work(jl_ptls_t ptls)
{
size_t work = 0;
for (int i = 0; i < jl_n_threads; i++) {
if (i == ptls->tid)
continue;
jl_ptls_t ptls2 = jl_all_tls_states[i];
if (jl_atomic_load_relaxed(&ptls2->gc_state) == JL_GC_STATE_PARALLEL) {
jl_gc_mark_cache_t *gc_cache2 = &ptls2->gc_cache;
jl_gc_ws_queue_t *mark_queue2 = &gc_cache2->mark_queue;
// This count can be slightly off, but it doesn't matter
// for recruitment heuristics
jl_gc_ws_bottom_t bottom2 = jl_atomic_load_relaxed(&mark_queue2->bottom);
jl_gc_ws_top_t top2 = jl_atomic_load_relaxed(&mark_queue2->top);
work += bottom2.pc_offset - top2.offset;
}
}
return work;
}

void jl_safepoint_master_notify_all(jl_ptls_t ptls)
{
for (int i = 0; i < jl_n_threads; i++) {
if (i == ptls->tid)
continue;
uv_mutex_lock(&safepoint_sleep_locks[i]);
uv_cond_signal(&safepoint_wake_signals[i]);
uv_mutex_unlock(&safepoint_sleep_locks[i]);
}
}

void jl_safepoint_master_recruit_workers(jl_ptls_t ptls, size_t nworkers)
{
for (int i = 0; i < jl_n_threads && nworkers > 0; i++) {
if (i == ptls->tid)
continue;
jl_ptls_t ptls2 = jl_all_tls_states[i];
if (jl_atomic_load_acquire(&ptls2->gc_state) == JL_GC_STATE_WAITING) {
uv_mutex_lock(&safepoint_sleep_locks[i]);
uv_cond_signal(&safepoint_wake_signals[i]);
uv_mutex_unlock(&safepoint_sleep_locks[i]);
nworkers--;
}
}
}

int jl_safepoint_master_end_marking(jl_ptls_t ptls)
{
// All workers done with marking
if (jl_atomic_load_acquire(&nworkers_marking) == 0)
return 1;
int no_master = -1;
if (jl_atomic_cmpswap(&jl_gc_safepoint_master, &no_master, ptls->tid)) {
spin: {
if (jl_atomic_load_acquire(&nworkers_marking) > 0) {
size_t work = jl_safepoint_master_count_work(ptls);
// If there is enough work, recruit workers and also become a worker,
// relinquishing the safepoint master status
if (work > 2) {
jl_safepoint_master_recruit_workers(ptls, work - 1);
jl_atomic_store_release(&jl_gc_safepoint_master, -1);
jl_safepoint_try_recruit(ptls);
return 0;
}
goto spin;
}
}
jl_atomic_store_release(&jl_gc_safepoint_master, -1);
jl_safepoint_master_notify_all(ptls);
return 1;
}
return 0;
}

void jl_safepoint_wait_gc(void)
{
// The thread should have set this is already
assert(jl_atomic_load_relaxed(&jl_current_task->ptls->gc_state) != 0);
// Use normal volatile load in the loop for speed until GC finishes.
// Then use an acquire load to make sure the GC result is visible on this thread.
jl_ptls_t ptls = jl_current_task->ptls;
while (jl_atomic_load_relaxed(&jl_gc_running) || jl_atomic_load_acquire(&jl_gc_running)) {
jl_cpu_pause(); // yield?
if (jl_safepoint_master_end_marking(ptls)) {
// Clean-up buffers from `reclaim_set`
jl_gc_mark_cache_t *gc_cache = &ptls->gc_cache;
jl_gc_ws_queue_t *mark_queue = &gc_cache->mark_queue;
arraylist_t *rs = mark_queue->reclaim_set;
jl_gc_ws_array_t *a;
while ((a = (jl_gc_ws_array_t*)arraylist_pop(rs))) {
free(a->pc_start);
free(a->data_start);
free(a);
}
break;
}
uv_mutex_lock(&safepoint_sleep_locks[ptls->tid]);
if (!uv_cond_timedwait(&safepoint_wake_signals[ptls->tid],
&safepoint_sleep_locks[ptls->tid], timeout_ns)) {
// Stopped waiting because we got a notification
// from safepoint master: try to get recruited
jl_safepoint_try_recruit(ptls);
}
uv_mutex_unlock(&safepoint_sleep_locks[ptls->tid]);
// Otherwise, just go to the top of the loop and try
// to become a safepoint master
}
}

Expand Down
Loading