Skip to content

Commit

Permalink
added cluster manager for mpi
Browse files Browse the repository at this point in the history
  • Loading branch information
amitmurthy committed Jan 9, 2015
1 parent 37f5254 commit fccdb09
Show file tree
Hide file tree
Showing 10 changed files with 192 additions and 13 deletions.
44 changes: 44 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,50 @@ able to run the MPI job as expected, e.g., with

mpirun -np 3 julia 01-hello.jl

## Using MPI and Julia parallel constructs together

In order for MPI calls to be made from a Julia cluster, it requires the use of
MPIManager, a cluster manager that will start the julia workers using `mpirun`

An example is provided in `examples/05-juliacman.jl`.
The julia master process is NOT part of the MPI cluster. All the workers
started via MPIManager will be part of the MPI cluster.

`MPIManager(;np=Sys.CPU_CORES, launch_cmd=false, output_filename=tempname(), launch_timeout=60.0)`

If not specified, `launch_cmd` defaults to `mpirun -np $np --output-filename $output_filename julia --worker`
If `launch_cmd` is specified, it is important that `--output-filename` is also specified and that it
match exactly, since the julia cluster setup requires access to the STDOUT of each worker. Also note that
`--worker` MUST be specified as an argument to the julia processes started via `mpirun`.

The following lines will be typically required on the julia master process to support both julia and mpi

```
# to import MPIManager
using MPI
# specify, number of mpi workers, launch cmd, etc.
manager=MPIManager(np=4)
# start mpi workers and add them as julia workers too.
addprocs(manager)
# load module MPI on all newly launched workers
@everywhere using MPI
# calls MPI.Init() on all workers and sets up atexit() handlers to call MPI.Finalize()
mpi_init(manager)
```

To execute code with MPI calls on all workers, use `@mpi_do`, for example

`@mpi_do include("01-hello.jl")` loads and runs `01-hello.jl` on all the mpi workers only

`examples/05-juliacman.jl` is a simple example of calling MPI functions on all workers
interspersed with Julia parallel methods.



[Julia]: http://julialang.org/
[MPI]: http://www.mpi-forum.org/
[mpi4py]: http://mpi4py.scipy.org
Expand Down
1 change: 1 addition & 0 deletions REQUIRE
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
BinDeps
Compat
4 changes: 2 additions & 2 deletions examples/01-hello.jl
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import MPI

function main()
MPI.Init()
(myid() == 1) && MPI.Init()

comm = MPI.COMM_WORLD

println("Hello world, I am $(MPI.Comm_rank(comm)) of $(MPI.Comm_size(comm))")

MPI.Barrier(comm)

MPI.Finalize()
(myid() == 1) && MPI.Finalize()
end

main()
6 changes: 3 additions & 3 deletions examples/02-broadcast.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import MPI

function main()
MPI.Init()
(myid() == 1) && MPI.Init()

comm = MPI.COMM_WORLD

Expand All @@ -27,7 +27,7 @@ function main()
@printf("[%02d] A:%s\n", MPI.Comm_rank(comm), A)

if MPI.Comm_rank(comm) == root
B = {"foo" => "bar"}
B = Dict("foo" => "bar")
else
B = Nothing
end
Expand All @@ -45,7 +45,7 @@ function main()
f = MPI.bcast(f, root, comm)
@printf("[%02d] f(3):%d\n", MPI.Comm_rank(comm), f(3))

MPI.Finalize()
(myid() == 1) && MPI.Finalize()
end

main()
4 changes: 2 additions & 2 deletions examples/03-reduce.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import MPI

function main()
MPI.Init()
(myid() == 1) && MPI.Init()

comm = MPI.COMM_WORLD

Expand All @@ -16,7 +16,7 @@ function main()
@printf("sum of ranks: %s\n", sr)
end

MPI.Finalize()
(myid() == 1) && MPI.Finalize()
end

main()
6 changes: 3 additions & 3 deletions examples/04-sendrecv.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import MPI

function main()
MPI.Init()
(myid() == 1) && MPI.Init()

comm = MPI.COMM_WORLD

Expand Down Expand Up @@ -30,8 +30,8 @@ function main()
println("$rank: Receiving $src -> $rank = $recv_mesg")

MPI.Barrier(comm)
MPI.Finalize()

(myid() == 1) && MPI.Finalize()
end

main()
35 changes: 35 additions & 0 deletions examples/05-juliacman.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using MPI

manager=MPIManager(np=4)
addprocs(manager)

@everywhere using MPI
mpi_init(manager)

println("Running 01-hello as part of a Julia cluster")
@mpi_do include("01-hello.jl")

#Interspersed julia parallel call
nheads = @parallel (+) for i=1:10^8
int(rand(Bool))
end
println("@parallel nheads $nheads")

println("Running 02-broadcast as part of a Julia cluster")
@mpi_do include("02-broadcast.jl")

M = [rand(10,10) for i=1:10]
pmap(svd, M)
println("pmap successful")

println("Running 03-reduce as part of a Julia cluster")
@mpi_do include("03-reduce.jl")

pids = [remotecall_fetch(p, myid) for p in workers()]
println("julia pids $pids")

println("Running 04-sendrecv as part of a Julia cluster")
@mpi_do include("04-sendrecv.jl")

exit()

3 changes: 3 additions & 0 deletions src/MPI.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
module MPI

using Compat

include("../deps/src/compile-time.jl")
include("mpi-base.jl")
include("cman.jl")

end
97 changes: 97 additions & 0 deletions src/cman.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import Base.launch, Base.manage
export MPIManager, mpi_init, launch, manage, @mpi_do

type MPIManager <: ClusterManager
np::Integer
launch_cmd::Cmd
output_filename::String
launch_timeout::Integer
map_mpi_julia::Dict
map_julia_mpi::Dict
l_socket
l_path

function MPIManager(;np=Sys.CPU_CORES, launch_cmd=false, output_filename=tempname(), launch_timeout=60.0)
if launch_cmd == false
launch_cmd = `mpirun -np $np --output-filename $output_filename julia --worker`
end
l_path = tempname()
l_socket = listen(l_path)

new(np, launch_cmd, output_filename, launch_timeout, Dict{Int, Int}(), Dict{Int, Int}(), l_socket, l_path)
end
end

function launch(manager::MPIManager, params::Dict, instances_arr::Array, c::Condition)
try
out, proc = open(detach(manager.launch_cmd))

# Get list of files with the same prefix as output_filename
output_dir = dirname(manager.output_filename)

t0=time()
allf = []
while (time() - t0) < manager.launch_timeout
allf = filter(x->startswith(x, basename(manager.output_filename)), readdir(output_dir))
(length(allf) == manager.np) && break
sleep(1.0)
end

@assert length(allf) == manager.np
for f in allf
config = WorkerConfig()
io, proc = open(`tail -f $output_dir/$f`)
finalizer(proc, (x)->kill(x))
config.io = io
config.userdata = proc
push!(instances_arr, config)
notify(c)
end
catch e
println("Error in MPI launch $e")
rethrow(e)
end

# @everywhere using MPI

end

function manage(manager::MPIManager, id::Integer, config::WorkerConfig, op::Symbol)
if op == :finalize
# if !isnull(config.io)
# close(config.io)
# end
elseif op == :interrupt
warn("Interrupting MPI workers is currently unsupported")
end
end

macro mpi_do(ex)
quote
@sync begin
for p in workers()
@spawnat p $ex
end
end
end
end


function mpi_init(manager::MPIManager)
refs=cell(nworkers())
for (i,p) in enumerate(workers())
rr = remotecall(p, ()-> begin
MPI.Init()
atexit(MPI.Finalize)
MPI.Comm_rank(MPI.COMM_WORLD)
end)
refs[i] = (rr, p)
end

for i in 1:nworkers()
mpi_id = fetch(refs[i][1])
manager.map_mpi_julia[mpi_id] = refs[i][2]
manager.map_mpi_julia[refs[i][2]] = mpi_id
end
end

5 changes: 2 additions & 3 deletions src/mpi-base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ typealias MPIDatatype Union(Char,
Uint64,
Float32, Float64, Complex64, Complex128)

const datatypes =
{Char => MPI_WCHAR,
const datatypes = Dict{DataType, Any} (Char => MPI_WCHAR,
Int8 => MPI_INT8_T,
Uint8 => MPI_UINT8_T,
Int16 => MPI_INT16_T,
Expand All @@ -16,7 +15,7 @@ const datatypes =
Float32 => MPI_REAL4,
Float64 => MPI_REAL8,
Complex64 => MPI_COMPLEX8,
Complex128 => MPI_COMPLEX16}
Complex128 => MPI_COMPLEX16)

type Comm
val::Cint
Expand Down

0 comments on commit fccdb09

Please sign in to comment.