Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Outperform the way to consume tasks by domains #41

Merged
merged 2 commits into from
Sep 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions lib/miou.ml
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,25 @@ type pool = {
; fail: bool ref
; working_counter: int ref
; domains_counter: int ref
; tasks_is_empty: bool Atomic.t
}
(* NOTE(dinosaure): when we create the pool, we do a copy (eg.
[{ pool with ... }]) to includes spawned domains. To continue sharing mutable
values when copying, we need to use [ref] rather than [mutable]. *)

(* NOTE(dinosaure): The [tasks_is_empty] variable is used to determine whether
domains should re-observe the shared heap (via [worker]) to consume a task
intended for them or continue with the tasks they have to do according to
their internal heap.

This means that we don't have to use the system mutex too much and that the
domain only resynchronises with the shared heap when necessary - i.e. when
there is a task waiting to be assigned to a domain.

The value is atomic because it must be observed by several domains. Although
its modification (Atomic.set) is protected by the mutex, its observation is
not (so a domain could observe ‘while’ the variable is being modified). *)

let get_domain_from_uid pool ~uid =
List.find (fun domain -> Domain_uid.equal domain.uid uid) pool.domains

Expand Down Expand Up @@ -380,12 +394,13 @@ module Domain = struct
let add_into_domain = add_into_domain

let add_into_pool pool elt =
Mutex.lock pool.mutex;
let direction =
match elt with
| Pool_cancel _ -> Miou_sequence.Left
| _ -> Miou_sequence.Right
in
Mutex.lock pool.mutex;
Atomic.set pool.tasks_is_empty false;
Miou_sequence.add direction pool.tasks elt;
Condition.broadcast pool.condition_pending_work;
Mutex.unlock pool.mutex;
Expand Down Expand Up @@ -795,7 +810,7 @@ module Domain = struct
in
Miou_sequence.iter_node ~f:apply domain.hooks

let run pool (domain : domain) =
let rec run pool (domain : domain) =
run_hooks domain;
match Heapq.extract_min_exn domain.tasks with
| exception Heapq.Empty ->
Expand All @@ -806,7 +821,9 @@ module Domain = struct
m "[%a] does %a" Domain_uid.pp domain.uid _pp_domain_elt elt);
once pool domain elt;
if system_events_suspended domain then
unblock_awaits_with_system_events pool domain
unblock_awaits_with_system_events pool domain;
if Heapq.is_empty domain.tasks = false && Atomic.get pool.tasks_is_empty
then run pool domain

let self () =
let { uid; _ } = Effect.perform Self_domain in
Expand Down Expand Up @@ -886,6 +903,7 @@ module Pool = struct
done;
if !(pool.stop) then raise_notrace Exit;
transfer_all_tasks pool domain;
Atomic.set pool.tasks_is_empty (Miou_sequence.is_empty pool.tasks);
incr pool.working_counter;
Mutex.unlock pool.mutex;
Domain.run pool domain;
Expand Down Expand Up @@ -956,6 +974,7 @@ module Pool = struct
; domains= []
; dom0
; to_dom0= Queue.create ()
; tasks_is_empty= Atomic.make true
}
in
let clatch = Clatch.create domains_counter in
Expand Down