diff --git a/lib/miou.ml b/lib/miou.ml index 4ec86fc..2f991b5 100644 --- a/lib/miou.ml +++ b/lib/miou.ml @@ -231,11 +231,25 @@ type pool = { ; fail: bool ref ; working_counter: int ref ; domains_counter: int ref + ; tasks_is_empty: bool Atomic.t } (* NOTE(dinosaure): when we create the pool, we do a copy (eg. [{ pool with ... }]) to includes spawned domains. To continue sharing mutable values when copying, we need to use [ref] rather than [mutable]. *) +(* NOTE(dinosaure): The [tasks_is_empty] variable is used to determine whether + domains should re-observe the shared heap (via [worker]) to consume a task + intended for them or continue with the tasks they have to do according to + their internal heap. + + This means that we don't have to use the system mutex too much and that the + domain only resynchronises with the shared heap when necessary - i.e. when + there is a task waiting to be assigned to a domain. + + The value is atomic because it must be observed by several domains. Although + its modification (Atomic.set) is protected by the mutex, its observation is + not (so a domain could observe ‘while’ the variable is being modified). *) + let get_domain_from_uid pool ~uid = List.find (fun domain -> Domain_uid.equal domain.uid uid) pool.domains @@ -380,12 +394,13 @@ module Domain = struct let add_into_domain = add_into_domain let add_into_pool pool elt = - Mutex.lock pool.mutex; let direction = match elt with | Pool_cancel _ -> Miou_sequence.Left | _ -> Miou_sequence.Right in + Mutex.lock pool.mutex; + Atomic.set pool.tasks_is_empty false; Miou_sequence.add direction pool.tasks elt; Condition.broadcast pool.condition_pending_work; Mutex.unlock pool.mutex; @@ -795,7 +810,7 @@ module Domain = struct in Miou_sequence.iter_node ~f:apply domain.hooks - let run pool (domain : domain) = + let rec run pool (domain : domain) = run_hooks domain; match Heapq.extract_min_exn domain.tasks with | exception Heapq.Empty -> @@ -806,7 +821,9 @@ module Domain = struct m "[%a] does %a" Domain_uid.pp domain.uid _pp_domain_elt elt); once pool domain elt; if system_events_suspended domain then - unblock_awaits_with_system_events pool domain + unblock_awaits_with_system_events pool domain; + if Heapq.is_empty domain.tasks = false && Atomic.get pool.tasks_is_empty + then run pool domain let self () = let { uid; _ } = Effect.perform Self_domain in @@ -886,6 +903,7 @@ module Pool = struct done; if !(pool.stop) then raise_notrace Exit; transfer_all_tasks pool domain; + Atomic.set pool.tasks_is_empty (Miou_sequence.is_empty pool.tasks); incr pool.working_counter; Mutex.unlock pool.mutex; Domain.run pool domain; @@ -956,6 +974,7 @@ module Pool = struct ; domains= [] ; dom0 ; to_dom0= Queue.create () + ; tasks_is_empty= Atomic.make true } in let clatch = Clatch.create domains_counter in