From b8b128c8b00801878d6a72531e6bb128c25dad59 Mon Sep 17 00:00:00 2001 From: Calascibetta Romain Date: Sat, 21 Sep 2024 17:43:29 +0200 Subject: [PATCH 1/2] Fallback to the worker (which introspects the shared queue between domains) only when it's needed. On benchmarks, the mutex (system) has a huge cost. This patch add a new atomic into the pool to set to false when the shared queue is not empty. In this case, all domains fallback to the worker function to consume all pending tasks available into the shared queue. Otherwise, domains just consume their own internal queues until they are empty. We keep the behavior that when something appears into the shared queue, domains (all domains) introspects this shared queue in the exclusive way (via the mutex). The test30 shows the case of a cancellation across domains and where one of our domains should consume this task from the shared queue and do, as soon as possible, the cancellation. --- lib/miou.ml | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/lib/miou.ml b/lib/miou.ml index 4ec86fc..ec8c099 100644 --- a/lib/miou.ml +++ b/lib/miou.ml @@ -231,6 +231,7 @@ type pool = { ; fail: bool ref ; working_counter: int ref ; domains_counter: int ref + ; tasks_is_empty: bool Atomic.t } (* NOTE(dinosaure): when we create the pool, we do a copy (eg. [{ pool with ... }]) to includes spawned domains. To continue sharing mutable @@ -380,12 +381,13 @@ module Domain = struct let add_into_domain = add_into_domain let add_into_pool pool elt = - Mutex.lock pool.mutex; let direction = match elt with | Pool_cancel _ -> Miou_sequence.Left | _ -> Miou_sequence.Right in + Mutex.lock pool.mutex; + Atomic.set pool.tasks_is_empty false; Miou_sequence.add direction pool.tasks elt; Condition.broadcast pool.condition_pending_work; Mutex.unlock pool.mutex; @@ -795,7 +797,7 @@ module Domain = struct in Miou_sequence.iter_node ~f:apply domain.hooks - let run pool (domain : domain) = + let rec run pool (domain : domain) = run_hooks domain; match Heapq.extract_min_exn domain.tasks with | exception Heapq.Empty -> @@ -806,7 +808,9 @@ module Domain = struct m "[%a] does %a" Domain_uid.pp domain.uid _pp_domain_elt elt); once pool domain elt; if system_events_suspended domain then - unblock_awaits_with_system_events pool domain + unblock_awaits_with_system_events pool domain; + if Heapq.is_empty domain.tasks = false && Atomic.get pool.tasks_is_empty + then run pool domain let self () = let { uid; _ } = Effect.perform Self_domain in @@ -886,6 +890,7 @@ module Pool = struct done; if !(pool.stop) then raise_notrace Exit; transfer_all_tasks pool domain; + Atomic.set pool.tasks_is_empty (Miou_sequence.is_empty pool.tasks); incr pool.working_counter; Mutex.unlock pool.mutex; Domain.run pool domain; @@ -956,6 +961,7 @@ module Pool = struct ; domains= [] ; dom0 ; to_dom0= Queue.create () + ; tasks_is_empty= Atomic.make true } in let clatch = Clatch.create domains_counter in From 68558e1d433bc0670e1cb8886a8bf6d7d6056a41 Mon Sep 17 00:00:00 2001 From: Calascibetta Romain Date: Sat, 21 Sep 2024 17:54:13 +0200 Subject: [PATCH 2/2] Internally documents pool.tasks_is_empty --- lib/miou.ml | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/lib/miou.ml b/lib/miou.ml index ec8c099..2f991b5 100644 --- a/lib/miou.ml +++ b/lib/miou.ml @@ -237,6 +237,19 @@ type pool = { [{ pool with ... }]) to includes spawned domains. To continue sharing mutable values when copying, we need to use [ref] rather than [mutable]. *) +(* NOTE(dinosaure): The [tasks_is_empty] variable is used to determine whether + domains should re-observe the shared heap (via [worker]) to consume a task + intended for them or continue with the tasks they have to do according to + their internal heap. + + This means that we don't have to use the system mutex too much and that the + domain only resynchronises with the shared heap when necessary - i.e. when + there is a task waiting to be assigned to a domain. + + The value is atomic because it must be observed by several domains. Although + its modification (Atomic.set) is protected by the mutex, its observation is + not (so a domain could observe ‘while’ the variable is being modified). *) + let get_domain_from_uid pool ~uid = List.find (fun domain -> Domain_uid.equal domain.uid uid) pool.domains