From 2f20f2a5e834f744dd34845f7693179bfd2ccfe6 Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Mon, 3 Feb 2025 13:24:33 -0800 Subject: [PATCH] dispatch queue in CI and event loop fixes --- .github/workflows/ci.yml | 4 +- source/darwin/dispatch_queue_event_loop.c | 141 ++++++++++++++++------ 2 files changed, 107 insertions(+), 38 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c02cb7ae3..760f0d1cf 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -243,7 +243,7 @@ jobs: run: | python3 -c "from urllib.request import urlretrieve; urlretrieve('${{ env.BUILDER_HOST }}/${{ env.BUILDER_SOURCE }}/${{ env.BUILDER_VERSION }}/builder.pyz?run=${{ env.RUN }}', 'builder')" chmod a+x builder - ./builder build -p ${{ env.PACKAGE_NAME }} --cmake-extra=-DAWS_USE_APPLE_NETWORK_FRAMEWORK=${{ matrix.eventloop == 'dispatch_queue' && 'ON' || 'OFF' }} --cmake-extra=-DENABLE_SANITIZERS=ON --cmake-extra=-DSANITIZERS="${{ matrix.sanitizers }}" + ./builder build -p ${{ env.PACKAGE_NAME }} --cmake-extra=-DAWS_USE_APPLE_DISPATCH_QUEUE=${{ matrix.eventloop == 'dispatch_queue' && 'ON' || 'OFF' }} --cmake-extra=-DENABLE_SANITIZERS=ON --cmake-extra=-DSANITIZERS="${{ matrix.sanitizers }}" macos-x64: runs-on: macos-14-large # latest @@ -274,7 +274,7 @@ jobs: run: | python3 -c "from urllib.request import urlretrieve; urlretrieve('${{ env.BUILDER_HOST }}/${{ env.BUILDER_SOURCE }}/${{ env.BUILDER_VERSION }}/builder.pyz?run=${{ env.RUN }}', 'builder')" chmod a+x builder - ./builder build -p ${{ env.PACKAGE_NAME }} --cmake-extra=-DAWS_USE_APPLE_NETWORK_FRAMEWORK=${{ matrix.eventloop == 'dispatch_queue' && 'ON' || 'OFF' }} --cmake-extra=-DENABLE_SANITIZERS=ON --cmake-extra=-DSANITIZERS="${{ matrix.sanitizers }}" --config Debug + ./builder build -p ${{ env.PACKAGE_NAME }} --cmake-extra=-DAWS_USE_APPLE_DISPATCH_QUEUE=${{ matrix.eventloop == 'dispatch_queue' && 'ON' || 'OFF' }} --cmake-extra=-DENABLE_SANITIZERS=ON --cmake-extra=-DSANITIZERS="${{ matrix.sanitizers }}" --config Debug freebsd: runs-on: ubuntu-24.04 # latest diff --git a/source/darwin/dispatch_queue_event_loop.c b/source/darwin/dispatch_queue_event_loop.c index e6381f435..91e5f4f1a 100644 --- a/source/darwin/dispatch_queue_event_loop.c +++ b/source/darwin/dispatch_queue_event_loop.c @@ -66,7 +66,7 @@ static struct aws_event_loop_vtable s_vtable = { * queue to insure the tasks scheduled on the event loop task scheduler are executed in the correct order. * * Data Structures ****** - * `scheudled_iteration_entry`: Each entry maps to an iteration we scheduled on Apple's dispatch queue. We lose control + * `scheduled_iteration_entry `: Each entry maps to an iteration we scheduled on Apple's dispatch queue. We lose control * of the submitted block once scheduled to Apple's dispatch queue. Apple will keep its dispatch queue alive and * increase its refcount on the dispatch queue for every entry we schedule an entry. Blocks scheduled for future * execution on a dispatch queue will obtain a refcount to the Apple dispatch queue to insure the dispatch queue is not @@ -82,7 +82,7 @@ static struct aws_event_loop_vtable s_vtable = { * The data structure used to track the dispatch queue execution iteration (block). Each entry is associated with * an run iteration scheduled on Apple Dispatch Queue. */ -struct scheudled_iteration_entry { +struct scheduled_iteration_entry { struct aws_allocator *allocator; uint64_t timestamp; struct aws_priority_queue_node priority_queue_node; @@ -103,8 +103,8 @@ static int s_unlock_synced_data(struct aws_dispatch_loop *dispatch_loop) { // Not sure why use 7 as the default queue size. Just follow what we used in task_scheduler.c static const size_t DEFAULT_QUEUE_SIZE = 7; static int s_compare_timestamps(const void *a, const void *b) { - uint64_t a_time = (*(struct scheudled_iteration_entry **)a)->timestamp; - uint64_t b_time = (*(struct scheudled_iteration_entry **)b)->timestamp; + uint64_t a_time = (*(struct scheduled_iteration_entry **)a)->timestamp; + uint64_t b_time = (*(struct scheduled_iteration_entry **)b)->timestamp; return a_time > b_time; /* min-heap */ } @@ -112,22 +112,36 @@ static int s_compare_timestamps(const void *a, const void *b) { * Allocates and returns a new memory alocated `scheduled_iteration_entry` struct * All scheduled_iteration_entry structs must have `s_scheduled_iteration_entry_destroy()` called on them. */ -static struct scheudled_iteration_entry *s_scheudled_iteration_entry_new( +static struct scheduled_iteration_entry *s_scheduled_iteration_entry_new( struct aws_dispatch_loop *dispatch_loop, uint64_t timestamp) { - struct scheudled_iteration_entry *entry = - aws_mem_calloc(dispatch_loop->allocator, 1, sizeof(struct scheudled_iteration_entry)); + struct scheduled_iteration_entry *entry = + aws_mem_calloc(dispatch_loop->allocator, 1, sizeof(struct scheduled_iteration_entry)); entry->allocator = dispatch_loop->allocator; entry->timestamp = timestamp; entry->dispatch_loop = dispatch_loop; aws_priority_queue_node_init(&entry->priority_queue_node); + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, + " DEBUG CREATING ITERATION ENTRY %p with timestamp %llu and allocator %p", + (void *)entry, + timestamp, + (void *)entry->allocator); + return entry; } /* Cleans up a `scheduled_iteration_entry` */ -static void s_scheduled_iteration_entry_destroy(struct scheudled_iteration_entry *entry) { +static void s_scheduled_iteration_entry_destroy(struct scheduled_iteration_entry *entry) { + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, + " DEBUG DESTROYING ITERATION ENTRY: %p dispatch_loop: %p timestamp: %llu allocator: %p", + (void *)entry, + (void *)entry->dispatch_loop, + entry->timestamp, + (void *)entry->allocator); entry->dispatch_loop = NULL; aws_mem_release(entry->allocator, entry); } @@ -144,16 +158,24 @@ static bool s_should_schedule_iteration( return true; } - struct scheudled_iteration_entry **entry = NULL; - aws_priority_queue_top(scheduled_iterations, (void **)&entry); + struct scheduled_iteration_entry **entry_ptr = NULL; + aws_priority_queue_top(scheduled_iterations, (void **)&entry_ptr); + AWS_FATAL_ASSERT(entry_ptr != NULL); + struct scheduled_iteration_entry *entry = *entry_ptr; + AWS_FATAL_ASSERT(entry != NULL); + + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, + "DEBUG s_should_schedule_iteration() comparing against %p from priority queue with timestamp %llu", + (void *)entry, + entry->timestamp); // is the next scheduled iteration later than what we require? - return (*entry)->timestamp > proposed_iteration_time; + return entry->timestamp > proposed_iteration_time; } // Manually called to destroy an aws_event_loop static void s_dispatch_event_loop_destroy(struct aws_event_loop *event_loop) { - // release dispatch loop struct aws_dispatch_loop *dispatch_loop = event_loop->impl_data; // The scheduler should be cleaned up and zeroed out in s_dispatch_queue_destroy_task. @@ -252,7 +274,7 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue( &dispatch_loop->synced_data.scheduled_iterations, alloc, DEFAULT_QUEUE_SIZE, - sizeof(struct scheudled_iteration_entry *), + sizeof(struct scheduled_iteration_entry *), &s_compare_timestamps)) { AWS_LOGF_ERROR( AWS_LS_IO_EVENT_LOOP, @@ -279,17 +301,44 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue( static void s_dispatch_queue_destroy_task(void *context) { struct aws_dispatch_loop *dispatch_loop = context; + AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Releasing Dispatch Queue.", (void *)dispatch_loop->base_loop); s_lock_synced_data(dispatch_loop); dispatch_loop->synced_data.is_destroying = true; dispatch_loop->synced_data.current_thread_id = aws_thread_current_thread_id(); dispatch_loop->synced_data.is_executing = true; - // swap the cross-thread tasks into task-local data + /* + * Swap tasks from cross_thread_tasks into local_cross_thread_tasks to cancel them as well as the tasks already + * in the scheduler. + */ struct aws_linked_list local_cross_thread_tasks; aws_linked_list_init(&local_cross_thread_tasks); aws_linked_list_swap_contents(&dispatch_loop->synced_data.cross_thread_tasks, &local_cross_thread_tasks); + /* + * Because this task was scheudled on the dispatch queue using `dispatch_async_and_wait_t()` we can be sure that + * any blocks running after it, and any scheduled iterations will occur AFTER we have cancelled all existing tasks + * in both the cross_thread_tasks and scheduler. It is safe at this point to NULL the dispatch_queue from all + * iteration blocks scheduled to run in the future. + */ + struct aws_array_list *scheduled_iterations_array = &dispatch_loop->synced_data.scheduled_iterations.container; + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, + "DEBUG ITERATION ENTRY COUNT AT DESTROY %zu", + aws_array_list_length(scheduled_iterations_array)); + for (size_t i = 0; i < aws_array_list_length(scheduled_iterations_array); ++i) { + struct scheduled_iteration_entry **entry_ptr = NULL; + aws_array_list_get_at_ptr(scheduled_iterations_array, (void **)&entry_ptr, i); + struct scheduled_iteration_entry *entry = *entry_ptr; + // aws_array_list_get_at_ptr(scheduled_iterations_array, (void **)&entry, i); + if (entry->dispatch_loop) { + AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "X X X X X X X X DEBUG NULLING ITERATION ENTRY %p", (void *)entry); + entry->dispatch_loop = NULL; + } + } + + s_unlock_synced_data(dispatch_loop); aws_task_scheduler_clean_up(&dispatch_loop->scheduler); /* Tasks in scheduler get cancelled */ while (!aws_linked_list_empty(&local_cross_thread_tasks)) { struct aws_linked_list_node *node = aws_linked_list_pop_front(&local_cross_thread_tasks); @@ -297,9 +346,7 @@ static void s_dispatch_queue_destroy_task(void *context) { task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED); } - // DEBUG WIP we need to iterate through the scheduled_iterations and set the service entry dispatch_loops to NULL - // struct aws_linked_list *scheduled_iterations = *dispatch_loop->synced_data.scheduled_iterations.container; - + s_lock_synced_data(dispatch_loop); dispatch_loop->synced_data.is_executing = false; s_unlock_synced_data(dispatch_loop); @@ -325,8 +372,6 @@ static void s_destroy(struct aws_event_loop *event_loop) { * AFTER s_dispatch_queue_destroy_task() has executued. */ dispatch_async_and_wait_f(dispatch_loop->dispatch_queue, dispatch_loop, s_dispatch_queue_destroy_task); - - AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Releasing Dispatch Queue.", (void *)event_loop); } // DEBUG WIP what is this and do we need to do anything here to have parity with other event loops? @@ -384,9 +429,9 @@ static int s_stop(struct aws_event_loop *event_loop) { * Apple dispatch queue. */ static void s_run_iteration(void *service_entry) { - struct scheudled_iteration_entry *entry = service_entry; + struct scheduled_iteration_entry *entry = service_entry; struct aws_dispatch_loop *dispatch_loop = entry->dispatch_loop; - + AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "DEBUG RUNNING ITERATION ENTRY %p", (void *)entry); /* * A scheduled_iteration_entry can have been enqueued by Apple to run AFTER `s_dispatch_queue_destroy_task()` has * been executed and the `aws_dispatch_loop` and parent `aws_event_loop` have been cleaned up. During the execution @@ -395,7 +440,8 @@ static void s_run_iteration(void *service_entry) { * determine whether this iteration is executing on an Apple dispatch queue that is no longer associated with an * `aws_dispatch_loop` or an `aws_event_loop`. */ - if (dispatch_loop == NULL) { + if (entry->dispatch_loop == NULL) { + AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "DEBUG ENTRY DISPATCH LOOP IS NULL"); /* * At this point, both the `aws_dispatch_loop` and `aws_event_loop` have been destroyed. * Clean up the `scheduled_iteration_entry` and end the block to release its refcount on Apple's dispatch queue. @@ -442,7 +488,7 @@ static void s_run_iteration(void *service_entry) { dispatch_loop->synced_data.is_executing = false; /* Remove the entry that's ending its iteration before further scheduling */ - aws_priority_queue_remove(&dispatch_loop->synced_data.scheduled_iterations, entry, &entry->priority_queue_node); + aws_priority_queue_remove(&dispatch_loop->synced_data.scheduled_iterations, &entry, &entry->priority_queue_node); /* destroy the completed service entry. */ s_scheduled_iteration_entry_destroy(entry); @@ -486,29 +532,52 @@ static void s_try_schedule_new_iteration(struct aws_dispatch_loop *dispatch_loop } if (!s_should_schedule_iteration(&dispatch_loop->synced_data.scheduled_iterations, timestamp)) { + AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "DEBUG s_should_schedule_iteration() returned FALSE"); return; } - struct scheudled_iteration_entry *entry = s_scheudled_iteration_entry_new(dispatch_loop, timestamp); - aws_priority_queue_push_ref(&dispatch_loop->synced_data.scheduled_iterations, entry, &entry->priority_queue_node); + struct scheduled_iteration_entry *entry = s_scheduled_iteration_entry_new(dispatch_loop, timestamp); + aws_priority_queue_push_ref( + &dispatch_loop->synced_data.scheduled_iterations, (void *)&entry, &entry->priority_queue_node); + + /** + * Apple dispatch queue uses automatic reference counting (ARC). If an iteration is scheduled to run in the future, + * the dispatch queue will persist until it is executed. Scheduling a block far into the future will keep the + * dispatch queue alive unnecessarily long, even after aws_event_loop and aws_dispatch_loop have been fully + * destroyed and cleaned up. To mitigate this, we ensure an iteration is scheduled no longer than 1 second in the + * future. + */ uint64_t now_ns = 0; aws_event_loop_current_clock_time(dispatch_loop->base_loop, &now_ns); uint64_t delta = timestamp > now_ns ? timestamp - now_ns : 0; - /** - * The Apple dispatch queue uses automatic reference counting (ARC). If an iteration remains in the queue, it will - * persist until it is executed. Scheduling a block far into the future can keep the dispatch queue alive - * unnecessarily, even if the app has shutdown. To avoid this, Ensure an iteration is scheduled within a - * 1-second interval to prevent it from remaining in the Apple dispatch queue indefinitely. - */ - delta = aws_min_u64(delta, AWS_TIMESTAMP_NANOS); if (delta == 0) { - // dispatch_after_f(0 , ...) is equivclient to dispatch_async_f(...) functionality wise, while - // dispatch_after_f(0 , ...) is not as optimal as dispatch_async_f(...) - // https://developer.apple.com/documentation/dispatch/1452878-dispatch_after_f + /* + * If the timestamp was set to execute immediately or in the past we schedule `s_run_iteration()` to run + * immediately using `dispatch_async_f()` which schedules a block to run on the dispatch queue in a FIFO order. + */ dispatch_async_f(dispatch_loop->dispatch_queue, entry, s_run_iteration); + + AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, " DEBUG dispatch_async_f() entry: %p", (void *)entry); } else { - dispatch_after_f(delta, dispatch_loop->dispatch_queue, entry, s_run_iteration); + /* + * If the timestamp is set to execute sometime in the future, we clamp the time to 1 second max, convert the + * time to the format dispatch queue expects, and then schedule `s_run_iteration()` to run in the future using + * `dispatch_after_f()`. `dispatch_after_f()` does not immediately place the block onto the dispatch queue but + * instead obtains a refcount of Apple's dispatch queue and then schedules onto it at the requested time. Any + * blocks scheduled using `dispatch_async_f()` or `dispatch_after_f()` with a closer dispatch time will be + * placed on the dispatch queue and execute in order. + */ + delta = aws_min_u64(delta, AWS_TIMESTAMP_NANOS); + dispatch_time_t when = dispatch_time(DISPATCH_TIME_NOW, delta); + dispatch_after_f(when, dispatch_loop->dispatch_queue, entry, s_run_iteration); + + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, + " DEBUG dispatch_after_f() entry: %p now: %llu when: %llu", + (void *)entry, + now_ns, + (uint64_t)when); } }