Skip to content

Commit

Permalink
dispatch queue in CI and event loop fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
sbSteveK committed Feb 3, 2025
1 parent 8e95121 commit 2f20f2a
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 38 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ jobs:
run: |
python3 -c "from urllib.request import urlretrieve; urlretrieve('${{ env.BUILDER_HOST }}/${{ env.BUILDER_SOURCE }}/${{ env.BUILDER_VERSION }}/builder.pyz?run=${{ env.RUN }}', 'builder')"
chmod a+x builder
./builder build -p ${{ env.PACKAGE_NAME }} --cmake-extra=-DAWS_USE_APPLE_NETWORK_FRAMEWORK=${{ matrix.eventloop == 'dispatch_queue' && 'ON' || 'OFF' }} --cmake-extra=-DENABLE_SANITIZERS=ON --cmake-extra=-DSANITIZERS="${{ matrix.sanitizers }}"
./builder build -p ${{ env.PACKAGE_NAME }} --cmake-extra=-DAWS_USE_APPLE_DISPATCH_QUEUE=${{ matrix.eventloop == 'dispatch_queue' && 'ON' || 'OFF' }} --cmake-extra=-DENABLE_SANITIZERS=ON --cmake-extra=-DSANITIZERS="${{ matrix.sanitizers }}"
macos-x64:
runs-on: macos-14-large # latest
Expand Down Expand Up @@ -274,7 +274,7 @@ jobs:
run: |
python3 -c "from urllib.request import urlretrieve; urlretrieve('${{ env.BUILDER_HOST }}/${{ env.BUILDER_SOURCE }}/${{ env.BUILDER_VERSION }}/builder.pyz?run=${{ env.RUN }}', 'builder')"
chmod a+x builder
./builder build -p ${{ env.PACKAGE_NAME }} --cmake-extra=-DAWS_USE_APPLE_NETWORK_FRAMEWORK=${{ matrix.eventloop == 'dispatch_queue' && 'ON' || 'OFF' }} --cmake-extra=-DENABLE_SANITIZERS=ON --cmake-extra=-DSANITIZERS="${{ matrix.sanitizers }}" --config Debug
./builder build -p ${{ env.PACKAGE_NAME }} --cmake-extra=-DAWS_USE_APPLE_DISPATCH_QUEUE=${{ matrix.eventloop == 'dispatch_queue' && 'ON' || 'OFF' }} --cmake-extra=-DENABLE_SANITIZERS=ON --cmake-extra=-DSANITIZERS="${{ matrix.sanitizers }}" --config Debug
freebsd:
runs-on: ubuntu-24.04 # latest
Expand Down
141 changes: 105 additions & 36 deletions source/darwin/dispatch_queue_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ static struct aws_event_loop_vtable s_vtable = {
* queue to insure the tasks scheduled on the event loop task scheduler are executed in the correct order.
*
* Data Structures ******
* `scheudled_iteration_entry`: Each entry maps to an iteration we scheduled on Apple's dispatch queue. We lose control
* `scheduled_iteration_entry `: Each entry maps to an iteration we scheduled on Apple's dispatch queue. We lose control
* of the submitted block once scheduled to Apple's dispatch queue. Apple will keep its dispatch queue alive and
* increase its refcount on the dispatch queue for every entry we schedule an entry. Blocks scheduled for future
* execution on a dispatch queue will obtain a refcount to the Apple dispatch queue to insure the dispatch queue is not
Expand All @@ -82,7 +82,7 @@ static struct aws_event_loop_vtable s_vtable = {
* The data structure used to track the dispatch queue execution iteration (block). Each entry is associated with
* an run iteration scheduled on Apple Dispatch Queue.
*/
struct scheudled_iteration_entry {
struct scheduled_iteration_entry {
struct aws_allocator *allocator;
uint64_t timestamp;
struct aws_priority_queue_node priority_queue_node;
Expand All @@ -103,31 +103,45 @@ static int s_unlock_synced_data(struct aws_dispatch_loop *dispatch_loop) {
// Not sure why use 7 as the default queue size. Just follow what we used in task_scheduler.c
static const size_t DEFAULT_QUEUE_SIZE = 7;
static int s_compare_timestamps(const void *a, const void *b) {
uint64_t a_time = (*(struct scheudled_iteration_entry **)a)->timestamp;
uint64_t b_time = (*(struct scheudled_iteration_entry **)b)->timestamp;
uint64_t a_time = (*(struct scheduled_iteration_entry **)a)->timestamp;
uint64_t b_time = (*(struct scheduled_iteration_entry **)b)->timestamp;
return a_time > b_time; /* min-heap */
}

/*
* Allocates and returns a new memory alocated `scheduled_iteration_entry` struct
* All scheduled_iteration_entry structs must have `s_scheduled_iteration_entry_destroy()` called on them.
*/
static struct scheudled_iteration_entry *s_scheudled_iteration_entry_new(
static struct scheduled_iteration_entry *s_scheduled_iteration_entry_new(
struct aws_dispatch_loop *dispatch_loop,
uint64_t timestamp) {
struct scheudled_iteration_entry *entry =
aws_mem_calloc(dispatch_loop->allocator, 1, sizeof(struct scheudled_iteration_entry));
struct scheduled_iteration_entry *entry =
aws_mem_calloc(dispatch_loop->allocator, 1, sizeof(struct scheduled_iteration_entry));

entry->allocator = dispatch_loop->allocator;
entry->timestamp = timestamp;
entry->dispatch_loop = dispatch_loop;
aws_priority_queue_node_init(&entry->priority_queue_node);

AWS_LOGF_TRACE(
AWS_LS_IO_EVENT_LOOP,
" DEBUG CREATING ITERATION ENTRY %p with timestamp %llu and allocator %p",
(void *)entry,
timestamp,
(void *)entry->allocator);

return entry;
}

/* Cleans up a `scheduled_iteration_entry` */
static void s_scheduled_iteration_entry_destroy(struct scheudled_iteration_entry *entry) {
static void s_scheduled_iteration_entry_destroy(struct scheduled_iteration_entry *entry) {
AWS_LOGF_TRACE(
AWS_LS_IO_EVENT_LOOP,
" DEBUG DESTROYING ITERATION ENTRY: %p dispatch_loop: %p timestamp: %llu allocator: %p",
(void *)entry,
(void *)entry->dispatch_loop,
entry->timestamp,
(void *)entry->allocator);
entry->dispatch_loop = NULL;
aws_mem_release(entry->allocator, entry);
}
Expand All @@ -144,16 +158,24 @@ static bool s_should_schedule_iteration(
return true;
}

struct scheudled_iteration_entry **entry = NULL;
aws_priority_queue_top(scheduled_iterations, (void **)&entry);
struct scheduled_iteration_entry **entry_ptr = NULL;
aws_priority_queue_top(scheduled_iterations, (void **)&entry_ptr);
AWS_FATAL_ASSERT(entry_ptr != NULL);
struct scheduled_iteration_entry *entry = *entry_ptr;
AWS_FATAL_ASSERT(entry != NULL);

AWS_LOGF_TRACE(
AWS_LS_IO_EVENT_LOOP,
"DEBUG s_should_schedule_iteration() comparing against %p from priority queue with timestamp %llu",
(void *)entry,
entry->timestamp);

// is the next scheduled iteration later than what we require?
return (*entry)->timestamp > proposed_iteration_time;
return entry->timestamp > proposed_iteration_time;
}

// Manually called to destroy an aws_event_loop
static void s_dispatch_event_loop_destroy(struct aws_event_loop *event_loop) {
// release dispatch loop
struct aws_dispatch_loop *dispatch_loop = event_loop->impl_data;

// The scheduler should be cleaned up and zeroed out in s_dispatch_queue_destroy_task.
Expand Down Expand Up @@ -252,7 +274,7 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue(
&dispatch_loop->synced_data.scheduled_iterations,
alloc,
DEFAULT_QUEUE_SIZE,
sizeof(struct scheudled_iteration_entry *),
sizeof(struct scheduled_iteration_entry *),
&s_compare_timestamps)) {
AWS_LOGF_ERROR(
AWS_LS_IO_EVENT_LOOP,
Expand All @@ -279,27 +301,52 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue(

static void s_dispatch_queue_destroy_task(void *context) {
struct aws_dispatch_loop *dispatch_loop = context;
AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Releasing Dispatch Queue.", (void *)dispatch_loop->base_loop);

s_lock_synced_data(dispatch_loop);
dispatch_loop->synced_data.is_destroying = true;
dispatch_loop->synced_data.current_thread_id = aws_thread_current_thread_id();
dispatch_loop->synced_data.is_executing = true;

// swap the cross-thread tasks into task-local data
/*
* Swap tasks from cross_thread_tasks into local_cross_thread_tasks to cancel them as well as the tasks already
* in the scheduler.
*/
struct aws_linked_list local_cross_thread_tasks;
aws_linked_list_init(&local_cross_thread_tasks);
aws_linked_list_swap_contents(&dispatch_loop->synced_data.cross_thread_tasks, &local_cross_thread_tasks);

/*
* Because this task was scheudled on the dispatch queue using `dispatch_async_and_wait_t()` we can be sure that
* any blocks running after it, and any scheduled iterations will occur AFTER we have cancelled all existing tasks
* in both the cross_thread_tasks and scheduler. It is safe at this point to NULL the dispatch_queue from all
* iteration blocks scheduled to run in the future.
*/
struct aws_array_list *scheduled_iterations_array = &dispatch_loop->synced_data.scheduled_iterations.container;
AWS_LOGF_TRACE(
AWS_LS_IO_EVENT_LOOP,
"DEBUG ITERATION ENTRY COUNT AT DESTROY %zu",
aws_array_list_length(scheduled_iterations_array));
for (size_t i = 0; i < aws_array_list_length(scheduled_iterations_array); ++i) {
struct scheduled_iteration_entry **entry_ptr = NULL;
aws_array_list_get_at_ptr(scheduled_iterations_array, (void **)&entry_ptr, i);
struct scheduled_iteration_entry *entry = *entry_ptr;
// aws_array_list_get_at_ptr(scheduled_iterations_array, (void **)&entry, i);
if (entry->dispatch_loop) {
AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "X X X X X X X X DEBUG NULLING ITERATION ENTRY %p", (void *)entry);
entry->dispatch_loop = NULL;
}
}

s_unlock_synced_data(dispatch_loop);
aws_task_scheduler_clean_up(&dispatch_loop->scheduler); /* Tasks in scheduler get cancelled */
while (!aws_linked_list_empty(&local_cross_thread_tasks)) {
struct aws_linked_list_node *node = aws_linked_list_pop_front(&local_cross_thread_tasks);
struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);
task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED);
}

// DEBUG WIP we need to iterate through the scheduled_iterations and set the service entry dispatch_loops to NULL
// struct aws_linked_list *scheduled_iterations = *dispatch_loop->synced_data.scheduled_iterations.container;

s_lock_synced_data(dispatch_loop);
dispatch_loop->synced_data.is_executing = false;
s_unlock_synced_data(dispatch_loop);

Expand All @@ -325,8 +372,6 @@ static void s_destroy(struct aws_event_loop *event_loop) {
* AFTER s_dispatch_queue_destroy_task() has executued.
*/
dispatch_async_and_wait_f(dispatch_loop->dispatch_queue, dispatch_loop, s_dispatch_queue_destroy_task);

AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Releasing Dispatch Queue.", (void *)event_loop);
}

// DEBUG WIP what is this and do we need to do anything here to have parity with other event loops?
Expand Down Expand Up @@ -384,9 +429,9 @@ static int s_stop(struct aws_event_loop *event_loop) {
* Apple dispatch queue.
*/
static void s_run_iteration(void *service_entry) {
struct scheudled_iteration_entry *entry = service_entry;
struct scheduled_iteration_entry *entry = service_entry;
struct aws_dispatch_loop *dispatch_loop = entry->dispatch_loop;

AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "DEBUG RUNNING ITERATION ENTRY %p", (void *)entry);
/*
* A scheduled_iteration_entry can have been enqueued by Apple to run AFTER `s_dispatch_queue_destroy_task()` has
* been executed and the `aws_dispatch_loop` and parent `aws_event_loop` have been cleaned up. During the execution
Expand All @@ -395,7 +440,8 @@ static void s_run_iteration(void *service_entry) {
* determine whether this iteration is executing on an Apple dispatch queue that is no longer associated with an
* `aws_dispatch_loop` or an `aws_event_loop`.
*/
if (dispatch_loop == NULL) {
if (entry->dispatch_loop == NULL) {
AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "DEBUG ENTRY DISPATCH LOOP IS NULL");
/*
* At this point, both the `aws_dispatch_loop` and `aws_event_loop` have been destroyed.
* Clean up the `scheduled_iteration_entry` and end the block to release its refcount on Apple's dispatch queue.
Expand Down Expand Up @@ -442,7 +488,7 @@ static void s_run_iteration(void *service_entry) {
dispatch_loop->synced_data.is_executing = false;

/* Remove the entry that's ending its iteration before further scheduling */
aws_priority_queue_remove(&dispatch_loop->synced_data.scheduled_iterations, entry, &entry->priority_queue_node);
aws_priority_queue_remove(&dispatch_loop->synced_data.scheduled_iterations, &entry, &entry->priority_queue_node);
/* destroy the completed service entry. */
s_scheduled_iteration_entry_destroy(entry);

Expand Down Expand Up @@ -486,29 +532,52 @@ static void s_try_schedule_new_iteration(struct aws_dispatch_loop *dispatch_loop
}

if (!s_should_schedule_iteration(&dispatch_loop->synced_data.scheduled_iterations, timestamp)) {
AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "DEBUG s_should_schedule_iteration() returned FALSE");
return;
}
struct scheudled_iteration_entry *entry = s_scheudled_iteration_entry_new(dispatch_loop, timestamp);
aws_priority_queue_push_ref(&dispatch_loop->synced_data.scheduled_iterations, entry, &entry->priority_queue_node);

struct scheduled_iteration_entry *entry = s_scheduled_iteration_entry_new(dispatch_loop, timestamp);
aws_priority_queue_push_ref(
&dispatch_loop->synced_data.scheduled_iterations, (void *)&entry, &entry->priority_queue_node);

/**
* Apple dispatch queue uses automatic reference counting (ARC). If an iteration is scheduled to run in the future,
* the dispatch queue will persist until it is executed. Scheduling a block far into the future will keep the
* dispatch queue alive unnecessarily long, even after aws_event_loop and aws_dispatch_loop have been fully
* destroyed and cleaned up. To mitigate this, we ensure an iteration is scheduled no longer than 1 second in the
* future.
*/
uint64_t now_ns = 0;
aws_event_loop_current_clock_time(dispatch_loop->base_loop, &now_ns);
uint64_t delta = timestamp > now_ns ? timestamp - now_ns : 0;
/**
* The Apple dispatch queue uses automatic reference counting (ARC). If an iteration remains in the queue, it will
* persist until it is executed. Scheduling a block far into the future can keep the dispatch queue alive
* unnecessarily, even if the app has shutdown. To avoid this, Ensure an iteration is scheduled within a
* 1-second interval to prevent it from remaining in the Apple dispatch queue indefinitely.
*/
delta = aws_min_u64(delta, AWS_TIMESTAMP_NANOS);

if (delta == 0) {
// dispatch_after_f(0 , ...) is equivclient to dispatch_async_f(...) functionality wise, while
// dispatch_after_f(0 , ...) is not as optimal as dispatch_async_f(...)
// https://developer.apple.com/documentation/dispatch/1452878-dispatch_after_f
/*
* If the timestamp was set to execute immediately or in the past we schedule `s_run_iteration()` to run
* immediately using `dispatch_async_f()` which schedules a block to run on the dispatch queue in a FIFO order.
*/
dispatch_async_f(dispatch_loop->dispatch_queue, entry, s_run_iteration);

AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, " DEBUG dispatch_async_f() entry: %p", (void *)entry);
} else {
dispatch_after_f(delta, dispatch_loop->dispatch_queue, entry, s_run_iteration);
/*
* If the timestamp is set to execute sometime in the future, we clamp the time to 1 second max, convert the
* time to the format dispatch queue expects, and then schedule `s_run_iteration()` to run in the future using
* `dispatch_after_f()`. `dispatch_after_f()` does not immediately place the block onto the dispatch queue but
* instead obtains a refcount of Apple's dispatch queue and then schedules onto it at the requested time. Any
* blocks scheduled using `dispatch_async_f()` or `dispatch_after_f()` with a closer dispatch time will be
* placed on the dispatch queue and execute in order.
*/
delta = aws_min_u64(delta, AWS_TIMESTAMP_NANOS);
dispatch_time_t when = dispatch_time(DISPATCH_TIME_NOW, delta);
dispatch_after_f(when, dispatch_loop->dispatch_queue, entry, s_run_iteration);

AWS_LOGF_TRACE(
AWS_LS_IO_EVENT_LOOP,
" DEBUG dispatch_after_f() entry: %p now: %llu when: %llu",
(void *)entry,
now_ns,
(uint64_t)when);
}
}

Expand Down

0 comments on commit 2f20f2a

Please sign in to comment.