Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel map #40

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions lib/task.ml
Original file line number Diff line number Diff line change
Expand Up @@ -110,21 +110,31 @@ let parallel_for ?(chunk_size=0) ~start ~finish ~body pool =

let parallel_scan pool op elements =

let n = Array.length elements in
let p = (Array.length pool.domains) + 1 in
let prefix_s = Array.copy elements in

let scan_part op elements prefix_sum start finish =
assert (Array.length elements > (finish - start));
assert (Array.length elements >= (finish - start));
for i = (start + 1) to finish do
prefix_sum.(i) <- op prefix_sum.(i - 1) elements.(i)
done
in

if (n <= p) || (p = 1) then begin
(* Performs a sequential scan when number of elements is less than or equal
to the pool size or if the number of domains is one *)
scan_part op elements prefix_s 0 (n - 1);
prefix_s
end
else begin

let add_offset op prefix_sum offset start finish =
assert (Array.length prefix_sum > (finish - start));
assert (Array.length prefix_sum >= (finish - start));
for i = start to finish do
prefix_sum.(i) <- op offset prefix_sum.(i)
done
in
let n = Array.length elements in
let p = (Array.length pool.domains) + 1 in
let prefix_s = Array.copy elements in

parallel_for pool ~chunk_size:1 ~start:0 ~finish:(p - 1)
~body:(fun i ->
Expand All @@ -148,5 +158,14 @@ let parallel_scan pool op elements =
let offset = prefix_s.(s - 1) in
add_offset op prefix_s offset s e
);

prefix_s
end


let parallel_map pool ?(chunk_size=0) f arr =
let len = Array.length arr in
let res = Array.make len (f arr.(0)) in
parallel_for ~chunk_size ~start:1 ~finish:(len - 1)
~body:(fun i ->
res.(i) <- (f arr.(i))) pool;
res
6 changes: 6 additions & 0 deletions lib/task.mli
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,9 @@ val parallel_scan : pool -> ('a -> 'a -> 'a) -> 'a array -> 'a array
* intermediate values. The reduce operations are performed in an arbitrary
* order and the reduce function needs to be commutative and associative in
* order to obtain a deterministic result *)

val parallel_map : pool -> ?chunk_size:int -> ('a -> 'b ) -> 'a array
-> 'b array
(** [parallel_map p c f arr] is similar to [Array.map], but runs the computation
* in parallel. The [chunk_size] parameter is optional, when not provided
* defaults to the chunk size computed in [parallel_for] *)
29 changes: 24 additions & 5 deletions test/test_task.ml
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,24 @@ let sum_sequence pool chunk_size init = fun () ->

(* Parallel scan *)

let prefix_sum pool = fun () ->
let prefix_sum pool n = fun () ->
let prefix_s l = List.rev (List.fold_left (fun a y -> match a with
| [] -> [y]
| x::_ -> (x+y)::a) [] l) in
let arr = Array.make 1000 1 in
let arr = Array.make n 1 in
let v1 = Task.parallel_scan pool (+) arr in
let ls = Array.to_list arr in
let v2 = prefix_s ls in
assert (v1 = Array.of_list v2)

let parallel_map pool chunk_size = fun () ->
let arr = Array.init 1000 (fun i -> i) in
let res_1 =
Task.parallel_map pool ~chunk_size (fun x -> x + 3) arr in
let res_2 = Array.map (fun x -> x + 3) arr in
assert (res_1 = res_2)

let () =
let pool = Task.setup_pool ~num_additional_domains:3 in
let run_all pool = fun () ->
modify_arr pool 0 ();
modify_arr pool 25 ();
modify_arr pool 100 ();
Expand All @@ -51,6 +56,20 @@ let () =
sum_sequence pool 1 10 ();
sum_sequence pool 100 10 ();
sum_sequence pool 100 100 ();
prefix_sum pool ();
prefix_sum pool 1000 ();
prefix_sum pool 3 ();
parallel_map pool 0 ();
parallel_map pool 10 ();
parallel_map pool 100 ()

let () =
let pool = Task.setup_pool ~num_additional_domains:3 in
run_all pool ();
Task.teardown_pool pool;
let pool2 = Task.setup_pool ~num_additional_domains:0 in
run_all pool2 ();
Task.teardown_pool pool2;
let pool3 = Task.setup_pool ~num_additional_domains:31 in
run_all pool3 ();
Task.teardown_pool pool3;
print_endline "ok"