Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC : Added interrupt_workers #3819

Merged
merged 1 commit into from
Aug 2, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1105,6 +1105,7 @@ export
addprocs,
ClusterManager,
fetch,
interrupt,
isready,
myid,
nprocs,
Expand Down
134 changes: 103 additions & 31 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ function send_msg_now(s::IO, kind, args...)
send_msg_unknown(s, kind, args)
end

abstract ClusterManager

type Worker
host::ByteString
port::Uint16
Expand All @@ -94,6 +96,9 @@ type Worker
add_msgs::Array{Any,1}
id::Int
gcflag::Bool
cman::ClusterManager
cman_ctxt
ospid

Worker(host::String, port::Integer, sock::TcpSocket, id::Int) =
new(bytestring(host), uint16(port), sock, IOBuffer(), {}, {}, id, false)
Expand Down Expand Up @@ -200,15 +205,22 @@ function add_workers(pg::ProcessGroup, w::Array{Any,1})
# NOTE: currently only node 1 can add new nodes, since nobody else
# has the full list of address:port
assert(LPROC.id == 1)
for i=1:length(w)
numw = length(w)
for i=1:numw
w[i].id = get_next_pid()
register_worker(w[i])
create_message_handler_loop(w[i].socket)

cman = w[i].cman
if contains(names(cman), :worker_cb) && isdefined(cman, :worker_cb)
cman.worker_cb(:register, cman, w[i].cman_ctxt, w[i].id, i)
end
end
all_locs = map(x -> isa(x, Worker) ? (x.host,x.port, x.id) : ("", 0, x.id), pg.workers)
for i=1:length(w)
for i=1:numw
send_msg_now(w[i], :join_pgrp, w[i].id, all_locs)
end

[w[i].id for i in 1:length(w)]
end

Expand Down Expand Up @@ -306,7 +318,14 @@ deregister_worker(pid) = deregister_worker(PGRP, pid)
function deregister_worker(pg, pid)
pg.workers = filter(x -> !(x.id == pid), pg.workers)
w = delete!(map_pid_wrkr, pid, nothing)
if isa(w, Worker) delete!(map_sock_wrkr, w.socket) end
if isa(w, Worker)
delete!(map_sock_wrkr, w.socket)

# Notify the cluster manager of this workers death
if (myid() == 1) && contains(names(w.cman), :worker_cb) && isdefined(w.cman, :worker_cb)
w.cman.worker_cb(:deregister, w.cman, w.cman_ctxt, w.id)
end
end
add!(map_del_wrkr, pid)

# delete this worker from our RemoteRef client sets
Expand Down Expand Up @@ -778,7 +797,7 @@ function create_message_handler_loop(sock::AsyncStream) #returns immediately
try
while true
msg = deserialize(sock)
#println("got msg: ",msg)
# println("got msg: ",msg)
# handle message
if is(msg, :call)
id = deserialize(sock)
Expand Down Expand Up @@ -834,6 +853,9 @@ function create_message_handler_loop(sock::AsyncStream) #returns immediately
register_worker(Worker("", 0, sock, 1))
register_worker(LPROC)

#send back env info to pid 1. Currently only ospid
send_msg(sock, :worker_info, getpid())

for (rhost, rport, rpid) in locs
if (rpid < self_pid) && (!(rpid == 1))
# Connect to them
Expand All @@ -847,7 +869,12 @@ function create_message_handler_loop(sock::AsyncStream) #returns immediately
continue
end
end
elseif is(msg, :worker_info)
w = map_sock_wrkr[sock]
ospid = deserialize(sock)
w.ospid = ospid
end

end # end of while
catch e
iderr = worker_id_from_socket(sock)
Expand Down Expand Up @@ -928,39 +955,39 @@ function start_cluster_workers(n, config)

# Get the cluster manager to launch the instance
(insttype, instances) = cman.launch_cb(n, config)


safe_tupidx(tup, pos) = length(tup) >= pos ? tup[pos] : nothing

if insttype == :io_only
read_cb_response(inst) =
begin
(host, port) = read_worker_host_port(inst)
inst, host, port, host
(host, port) = read_worker_host_port(inst[1])
inst[1], host, port, host, safe_tupidx(inst,2)
end
elseif insttype == :io_host
read_cb_response(inst) =
begin
io = inst[1]
(priv_hostname, port) = read_worker_host_port(io)
io, priv_hostname, port, inst[2]
(priv_hostname, port) = read_worker_host_port(inst[1])
inst[1], priv_hostname, port, inst[2], safe_tupidx(inst,3)
end
elseif insttype == :io_host_port
read_cb_response(inst) = (inst[1], inst[2], inst[3], inst[2])
read_cb_response(inst) = (inst[1], inst[2], inst[3], inst[2], safe_tupidx(inst,4))
elseif insttype == :host_port
read_cb_response(inst) = (nothing, inst[1], inst[2], inst[1])
read_cb_response(inst) = (nothing, inst[1], inst[2], inst[1], safe_tupidx(inst,3))
elseif insttype == :cmd
read_cb_response(inst) =
begin
io,_ = readsfrom(detach(inst))
(host, port) = read_worker_host_port(io)
io, host, port, host
io, host, port, host, safe_tupidx(inst,2)
end
else
error("Unsupported format from Cluster Manager callback")
end

for i=1:n
(io, privhost, port, pubhost) = read_cb_response(instances[i])
w[i] = create_worker(privhost, port, pubhost, io, config)
(io, privhost, port, pubhost, cman_ctxt) = read_cb_response(instances[i])
w[i] = create_worker(privhost, port, pubhost, io, config, cman_ctxt)
end
w
end
Expand All @@ -976,10 +1003,11 @@ function read_worker_host_port (io::IO)
end
end

function create_worker(privhost, port, pubhost, stream, config)
function create_worker(privhost, port, pubhost, stream, config, cman_ctxt)
tunnel = config[:tunnel]

s = split(pubhost,'@')
user = ""
if length(s) > 1
user = s[1]
pubhost = s[2]
Expand All @@ -997,6 +1025,8 @@ function create_worker(privhost, port, pubhost, stream, config)
else
w = Worker(pubhost, port)
end
w.cman = config[:cman]
w.cman_ctxt = cman_ctxt

if isa(stream, AsyncStream)
stream.line_buffered = true
Expand Down Expand Up @@ -1052,15 +1082,16 @@ function ssh_tunnel(user, host, privhost, port, sshflags)
end


abstract ClusterManager

function launch_procs(n::Integer, config::Dict)
dir = config[:dir]
exename = config[:exename]
exeflags = config[:exeflags]

cman = config[:cman]
outs=cell(n)
cman.config = config # Will need the various startup env later

io_objs = cell(n)
contexts = cell(n)

# start the processes first...
if (cman.ssh)
Expand All @@ -1071,36 +1102,52 @@ function launch_procs(n::Integer, config::Dict)
end

for i in 1:n
io,_ = readsfrom(detach(lcmd(i)))
outs[i] = io
io, pobj = readsfrom(detach(lcmd(i)))
io_objs[i] = io
contexts[i] = cman.ssh ? {:machine => cman.machines[i]} : {:pobj => pobj}
end

# ...and then read the host:port info. This optimizes overall start times.

# For ssh, the tunnel connection, if any, has to be with the specified machine name.
# but the port needs to be forwarded to the private hostname/ip-address
if cman.ssh
for i in 1:n
io = outs[i]
outs[i] = (io, cman.machines[i])
end
end

if cman.ssh
return (:io_host, outs)
return (:io_host, map(i -> (io_objs[i], cman.machines[i], contexts[i]), 1:n))
else
return (:io_only, outs)
return (:io_only, map(i -> (io_objs[i], contexts[i]), 1:n))
end
end

immutable RegularCluster <: ClusterManager
function cman_worker_cb(op, cman, ctxt, pid, args...)
if op == :interrupt
(ospid,) = args
if cman.ssh
machine = ctxt[:machine]
sshflags = cman.config[:sshflags]
if !success(`ssh -n $sshflags $machine "kill -2 $ospid"`)
println("Error sending a Ctrl-C to julia worker $pid on $machine")
end
else
pobj = ctxt[:pobj]
kill(pobj, 2)
end
end
end

type RegularCluster <: ClusterManager
launch_cb::Function
ssh::Bool
machines

RegularCluster(; ssh=false, machines=[]) = new(launch_procs, ssh, machines)
worker_cb::Function
config::Dict

RegularCluster(; ssh=false, machines=[]) = new(launch_procs, ssh, machines, cman_worker_cb)
end

show(io::IO, cman::RegularCluster) = println("RegularCluster :: ssh: ", cman.ssh, ", machines : ", cman.machines)

# start and connect to processes via SSH.
# optionally through an SSH tunnel.
# the tunnel is only used from the head (process 1); the nodes are assumed
Expand Down Expand Up @@ -1535,3 +1582,28 @@ function timedwait(testcb::Function, secs::Float64; pollint::Float64=0.1)
ret
end

function interrupt(pid::Integer)
assert(myid() == 1)
w = map_pid_wrkr[pid]
if isa(w, Worker)
if contains(names(w.cman), :worker_cb) && isdefined(w.cman, :worker_cb)
if isdefined(w, :ospid)
w.cman.worker_cb(:interrupt, w.cman, w.cman_ctxt, w.id, w.ospid)
else
# This state can happen immediately after an addprocs
println("Worker $(w.id) cannot be presently interrupted.")
end
else
println("This worker's ClusterManager does not support interrupts.")
end
end
end

function interrupt(pids::AbstractVector=workers())
assert(myid() == 1)
@sync begin
for pid in pids
@async interrupt(pid)
end
end
end