diff --git a/base/client.jl b/base/client.jl index 3518498795e83..527f174162e60 100644 --- a/base/client.jl +++ b/base/client.jl @@ -171,18 +171,19 @@ end # try to include() a file, ignoring if not found try_include(path::AbstractString) = isfile(path) && include(path) -function init_bind_addr(args::Vector{UTF8String}) +function init_bind_addr() # Treat --bind-to in a position independent manner in ARGS since # --worker, -n and --machinefile options are affected by it - btoidx = findfirst(args, "--bind-to") + btoidx = findfirst(ARGS, "--bind-to") if btoidx > 0 - bind_to = split(args[btoidx+1], ":") + bind_to = split(ARGS[btoidx+1], ":") bind_addr = parseip(bind_to[1]) if length(bind_to) > 1 bind_port = parseint(bind_to[2]) else bind_port = 0 end + deleteat!(ARGS, btoidx:btoidx+1) else bind_port = 0 try @@ -210,10 +211,16 @@ function process_options(args::Vector{UTF8String}) if args[i]=="-q" || args[i]=="--quiet" quiet = true elseif args[i]=="--worker" - start_worker() - # doesn't return - elseif args[i]=="--bind-to" - i+=1 # has already been processed + i+=1 + local manager + try + manager = eval(symbol(args[i])) + catch + # not defined, default to in-built worker startup and transport + manager = ClusterManager + end + start_worker(manager) # Built in manager, doesn't return + elseif args[i]=="-e" || args[i]=="--eval" i == length(args) && error("-e,--eval no provided") repl = false @@ -376,7 +383,7 @@ function _start() try init_parallel() - init_bind_addr(ARGS) + init_bind_addr() any(a->(a=="--worker"), ARGS) || init_head_sched() init_load_path() (quiet,repl,startup,color_set,no_history_file) = process_options(copy(ARGS)) diff --git a/base/exports.jl b/base/exports.jl index c16c24591f2a2..98c602f66caad 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -30,6 +30,7 @@ export BitArray, BitMatrix, BitVector, + BufferStream, CFILE, Cmd, Colon, diff --git a/base/multi.jl b/base/multi.jl index 628ec44e1f80d..d8db2ef83a49d 100644 --- a/base/multi.jl +++ b/base/multi.jl @@ -79,52 +79,28 @@ end abstract ClusterManager -type Worker - host::ByteString - port::UInt16 - socket::TCPSocket +type Worker{T<:ClusterManager} + id::Int + r_stream::AsyncStream + w_stream::AsyncStream + config::Dict + sendbuf::IOBuffer del_msgs::Array{Any,1} add_msgs::Array{Any,1} - id::Int gcflag::Bool bind_addr::IPAddr - manager::ClusterManager - config::Dict - Worker(host::AbstractString, port::Integer, sock::TCPSocket, id::Int) = - new(bytestring(host), uint16(port), sock, IOBuffer(), [], [], id, false) -end -Worker(host::AbstractString, port::Integer, sock::TCPSocket) = - Worker(host, port, sock, 0) -function Worker(host::AbstractString, port::Integer) - # Connect to the loopback port if requested host has the same ipaddress as self. - if host == string(LPROC.bind_addr) - w = Worker(host, port, connect("127.0.0.1", uint16(port))) - else - w = Worker(host, port, connect(host, uint16(port))) - end - # Avoid calling getaddrinfo if possible - involves a DNS lookup - # host may be a stringified ipv4 / ipv6 address or a dns name - if host == "localhost" - w.bind_addr = parseip("127.0.0.1") - else - try - w.bind_addr = parseip(host) - catch - w.bind_addr = getaddrinfo(host) - end - end - w -end -function Worker(host::AbstractString, bind_addr::AbstractString, port::Integer, tunnel_user::AbstractString, sshflags) - w = Worker(host, port, - connect("localhost", - ssh_tunnel(tunnel_user, host, bind_addr, uint16(port), sshflags))) - w.bind_addr = parseip(bind_addr) - w + Worker(id::Int, r_stream::AsyncStream, w_stream::AsyncStream, config::Dict) = + new(id, r_stream, w_stream, config, IOBuffer(), [], [], false) end +Worker{T<:ClusterManager}(::Type{T}, id::Int, stream::AsyncStream) = Worker(T, id, stream, AnyDict()) +Worker{T<:ClusterManager}(::Type{T}, id::Int, stream::AsyncStream, config::Dict) = Worker(T, id, stream, stream, config) +Worker{T<:ClusterManager}(::Type{T}, id::Int, r_stream::AsyncStream, w_stream::AsyncStream) = Worker(T, id, r_stream, w_stream, AnyDict()) +Worker{T<:ClusterManager}(::Type{T}, id::Int, r_stream::AsyncStream, w_stream::AsyncStream, config::Dict) = Worker{T}(id, r_stream, w_stream, config) + +wtype{T<:ClusterManager}(::Worker{T}) = T function send_msg_now(w::Worker, kind, args...) send_msg_(w, kind, args, true) @@ -151,7 +127,7 @@ function flush_gc_msgs(w::Worker) end #TODO: Move to different Thread -function enq_send_req(sock::TCPSocket, buf, now::Bool) +function enq_send_req(sock::AsyncStream, buf, now::Bool) arr=takebuf_array(buf) write(sock,arr) #TODO implement "now" @@ -168,7 +144,7 @@ function send_msg_(w::Worker, kind, args, now::Bool) if !now && w.gcflag flush_gc_msgs(w) else - enq_send_req(w.socket,buf,now) + enq_send_req(w.w_stream, buf, now) end end @@ -219,16 +195,14 @@ end const PGRP = ProcessGroup([]) get_bind_addr(pid::Integer) = get_bind_addr(worker_from_id(pid)) -function get_bind_addr(w::Union(Worker, LocalProcess)) - if !isdefined(w, :bind_addr) +get_bind_addr(w::LocalProcess) = LPROC.bind_addr +function get_bind_addr(w::Worker) + if !haskey(w.config, :bind_addr) if w.id != myid() - w.bind_addr = remotecall_fetch(w.id, get_bind_addr, w.id) - else - error("LPROC.bind_addr not defined") # Should never happend since LPROC.bind_addr - # is defined early on during process init. + w.config[:bind_addr] = remotecall_fetch(w.id, get_bind_addr, w.id) end end - w.bind_addr + w.config[:bind_addr] end function add_worker(pg::ProcessGroup, w) @@ -236,17 +210,16 @@ function add_worker(pg::ProcessGroup, w) # has the full list of address:port assert(LPROC.id == 1) rr_join = RemoteRef() - w.id = get_next_pid() register_worker(w) - create_message_handler_loop(w.socket; ntfy_join_complete=rr_join) + process_messages(w.r_stream, w.w_stream; ntfy_join_complete=rr_join) - all_locs = map(x -> isa(x, Worker) ? (string(x.bind_addr), x.port, x.id, isa(x.manager, LocalManager)) : ("", 0, x.id, true), pg.workers) + all_locs = map(x -> isa(x, Worker) ? (get(x.config, :connect_at, ()), x.id, wtype(x) == LocalManager) : ((), x.id, true), pg.workers) - send_msg_now(w, :join_pgrp, w.id, all_locs, isa(w.manager, LocalManager)) + send_msg_now(w, :join_pgrp, w.id, all_locs, wtype(w) == LocalManager) - @schedule manage(w.manager, w.id, w.config, :register) + @schedule manage(wtype(w), w.id, w.config, :register) - (w.id, rr_join) + rr_join end myid() = LPROC.id @@ -260,8 +233,8 @@ end procs() = Int[x.id for x in PGRP.workers] function procs(pid::Integer) if myid() == 1 - if (pid == 1) || isa(map_pid_wrkr[pid].manager, LocalManager) - Int[x.id for x in filter(w -> (w.id==1) || isa(w.manager, LocalManager), PGRP.workers)] + if (pid == 1) || (wtype(map_pid_wrkr[pid]) == LocalManager) + Int[x.id for x in filter(w -> (w.id==1) || (wtype(w) == LocalManager), PGRP.workers)] else ipatpid = get_bind_addr(pid) Int[x.id for x in filter(w -> get_bind_addr(w) == ipatpid, PGRP.workers)] @@ -336,7 +309,7 @@ end function worker_id_from_socket(s) w = get(map_sock_wrkr, s, nothing) if isa(w,Worker) - if is(s, w.socket) || is(s, w.sendbuf) + if is(s, w.r_stream) || is(s, w.w_stream) || is(s, w.sendbuf) return w.id end end @@ -347,13 +320,18 @@ function worker_id_from_socket(s) return -1 end +function reset_node() + LPROC.id = 0 + empty!(map_pid_wrkr) +end register_worker(w) = register_worker(PGRP, w) function register_worker(pg, w) push!(pg.workers, w) map_pid_wrkr[w.id] = w if isa(w, Worker) - map_sock_wrkr[w.socket] = w + map_sock_wrkr[w.r_stream] = w + map_sock_wrkr[w.w_stream] = w map_sock_wrkr[w.sendbuf] = w end end @@ -363,12 +341,15 @@ function deregister_worker(pg, pid) pg.workers = filter(x -> !(x.id == pid), pg.workers) w = pop!(map_pid_wrkr, pid, nothing) if isa(w, Worker) - pop!(map_sock_wrkr, w.socket) + pop!(map_sock_wrkr, w.r_stream) + if w.r_stream != w.w_stream + pop!(map_sock_wrkr, w.w_stream) + end pop!(map_sock_wrkr, w.sendbuf) # Notify the cluster manager of this workers death if myid() == 1 - manage(w.manager, w.id, w.config, :deregister) + manage(wtype(w), w.id, w.config, :deregister) end end push!(map_del_wrkr, pid) @@ -793,57 +774,71 @@ function accept_handler(server::TCPServer, status::Int32) error("an error occured during the creation of the server") end client = accept_nonblock(server) - create_message_handler_loop(client) + process_messages(client, client) +end + +function process_messages(r_stream::TCPSocket, w_stream::TCPSocket; kwargs...) + @schedule begin + disable_nagle(r_stream) + start_reading(r_stream) + wait_connected(r_stream) + if r_stream != w_stream + disable_nagle(w_stream) + wait_connected(w_stream) + end + create_message_handler_loop(r_stream, w_stream; kwargs...) + end end -function create_message_handler_loop(sock::AsyncStream; ntfy_join_complete=nothing) #returns immediately +function process_messages(r_stream::AsyncStream, w_stream::AsyncStream; kwargs...) + create_message_handler_loop(r_stream, w_stream; kwargs...) +end + +function create_message_handler_loop(r_stream::AsyncStream, w_stream::AsyncStream; ntfy_join_complete=nothing) #returns immediately @schedule begin global PGRP - #println("message_handler_loop") - disable_nagle(sock) - start_reading(sock) - wait_connected(sock) + global cluster_manager try while true - msg = deserialize(sock) - # println("got msg: ",msg) + msg = deserialize(r_stream) + #println("got msg: ",msg) # handle message if is(msg, :call) - id = deserialize(sock) + id = deserialize(r_stream) #print("$(myid()) got id $id\n") - f0 = deserialize(sock) + f0 = deserialize(r_stream) #print("$(myid()) got call $f0\n") - args0 = deserialize(sock) + args0 = deserialize(r_stream) #print("$(myid()) got args $args0\n") let f=f0, args=args0 schedule_call(id, ()->f(args...)) end elseif is(msg, :call_fetch) - id = deserialize(sock) - f = deserialize(sock) - args = deserialize(sock) + id = deserialize(r_stream) + f = deserialize(r_stream) + args = deserialize(r_stream) let f=f, args=args, id=id, msg=msg @schedule begin v = run_work_thunk(()->f(args...)) - deliver_result(sock, msg, id, v) + deliver_result(w_stream, msg, id, v) v end end elseif is(msg, :call_wait) - id = deserialize(sock) - notify_id = deserialize(sock) - f = deserialize(sock) - args = deserialize(sock) + id = deserialize(r_stream) + notify_id = deserialize(r_stream) + f = deserialize(r_stream) + args = deserialize(r_stream) let f=f, args=args, id=id, msg=msg, notify_id=notify_id @schedule begin rv = schedule_call(id, ()->f(args...)) - deliver_result(sock, msg, notify_id, wait_full(rv)) + deliver_result(w_stream, msg, notify_id, wait_full(rv)) end end elseif is(msg, :do) - f = deserialize(sock) - args = deserialize(sock) + f = deserialize(r_stream) + args = deserialize(r_stream) #print("got args: $args\n") let f=f, args=args @schedule begin @@ -852,39 +847,29 @@ function create_message_handler_loop(sock::AsyncStream; ntfy_join_complete=nothi end elseif is(msg, :result) # used to deliver result of wait or fetch - oid = deserialize(sock) + oid = deserialize(r_stream) #print("$(myid()) got $msg $oid\n") - val = deserialize(sock) + val = deserialize(r_stream) put!(lookup_ref(oid), val) elseif is(msg, :identify_socket) - otherid = deserialize(sock) - register_worker(Worker("", 0, sock, otherid)) + otherid = deserialize(r_stream) + register_worker(Worker(cluster_manager, otherid, r_stream, w_stream)) elseif is(msg, :join_pgrp) - # first connection; get process group info from client - self_pid = LPROC.id = deserialize(sock) - locs = deserialize(sock) - self_is_local = deserialize(sock) - #print("\nLocation: ",locs,"\nId:",myid(),"\n") - # joining existing process group - - controller = Worker("", 0, sock, 1) + self_pid = LPROC.id = deserialize(r_stream) + locs = deserialize(r_stream) + self_is_local = deserialize(r_stream) + controller = Worker(cluster_manager, 1, r_stream, w_stream) register_worker(controller) register_worker(LPROC) - for (rhost, rport, rpid, r_is_local) in locs + for (connect_at, rpid, r_is_local) in locs if (rpid < self_pid) && (!(rpid == 1)) # Connect to them - if self_is_local && r_is_local - # If on localhost, use the loopback address - this addresses - # the special case of system suspend wherein the local ip - # may be changed upon system awake. - w = Worker("127.0.0.1", rport) - else - w = Worker(rhost, rport) - end - w.id = rpid + config = AnyDict(:pid=>rpid, :connect_at=>connect_at, :self_is_local=>self_is_local, :r_is_local=>r_is_local) + (r_s, w_s) = connect_w2w(cluster_manager, rpid, config) + w = Worker(cluster_manager, rpid, r_s, w_s, config) register_worker(w) - create_message_handler_loop(w.socket) + process_messages(w.r_stream, w.w_stream) send_msg_now(w, :identify_socket, self_pid) else # Others will connect to us. Don't do anything just yet @@ -901,10 +886,10 @@ function create_message_handler_loop(sock::AsyncStream; ntfy_join_complete=nothi end # end of while catch e - iderr = worker_id_from_socket(sock) + iderr = worker_id_from_socket(r_stream) # If error occured talking to pid 1, commit harakiri if iderr == 1 - if isopen(sock) + if isopen(w_stream) print(STDERR, "fatal error on ", myid(), ": ") display_error(e, catch_backtrace()) end @@ -916,7 +901,8 @@ function create_message_handler_loop(sock::AsyncStream; ntfy_join_complete=nothi # to each other on unhandled errors deregister_worker(iderr) - if isopen(sock) close(sock) end + if isopen(r_stream) close(r_stream) end + if isopen(w_stream) close(w_stream) end if (myid() == 1) global rmprocset @@ -941,8 +927,8 @@ end # the entry point for julia worker processes. does not return. # argument is descriptor to write listening port # to. -start_worker() = start_worker(STDOUT) -function start_worker(out::IO) +start_worker{T<:ClusterManager}(::Type{T}) = start_worker(T, STDOUT) +function start_worker{T<:ClusterManager}(::Type{T}, out::IO) # we only explicitly monitor worker STDOUT on the console, so redirect # stderr to stdout so we can see the output. # at some point we might want some or all worker output to go to log @@ -951,6 +937,7 @@ function start_worker(out::IO) # exit when process 1 shut down. Don't yet know why. #redirect_stderr(STDOUT) + init_worker(T) if LPROC.bind_port == 0 (actual_port,sock) = listenany(uint16(9009)) LPROC.bind_port = actual_port @@ -966,7 +953,6 @@ function start_worker(out::IO) # close STDIN; workers will not use it #close(STDIN) - disable_threaded_libs() disable_nagle(sock) try @@ -980,42 +966,41 @@ function start_worker(out::IO) exit(0) end -function read_cb_response(io::IO, config::Dict) - (host, port) = read_worker_host_port(io) - return (io, host, port, host, config) -end - -function read_cb_response(io::IO, host::AbstractString, config::Dict) - (bind_addr, port) = read_worker_host_port(io) - return (io, bind_addr, port, host, config) -end - -read_cb_response(io::IO, host::AbstractString, port::Integer, config::Dict) = (io, host, port, host, config) - -read_cb_response(host::AbstractString, port::Integer, config::Dict) = (nothing, host, port, host, config) - - -function start_cluster_workers(np::Integer, config::Dict, manager::ClusterManager, resp_arr::Array, launched_ntfy::Condition) +function start_cluster_workers{T <: ClusterManager}(::Type{T}, config::Dict, resp_arr::Array, launched_ntfy::Condition) # Get the cluster manager to launch the instance - instance_sets = [] instances_ntfy = Condition() - t = @schedule launch(manager, np, config, instance_sets, instances_ntfy) + launched = [] + t = @schedule begin + try + launch(T, config, launched, instances_ntfy) + catch e + println("Error launching workers with ", T, " : ", e) + end + end while true - if (length(instance_sets) == 0) - istaskdone(t) && break + if length(launched) == 0 + if istaskdone(t) + break + end @schedule (sleep(1); notify(instances_ntfy)) wait(instances_ntfy) end - if length(instance_sets) > 0 - instances = shift!(instance_sets) - for inst in instances - (io, bind_addr, port, pubhost, wconfig) = read_cb_response(inst...) - push!(resp_arr, create_worker(bind_addr, port, pubhost, io, wconfig, manager)) - notify(launched_ntfy) - end + if length(launched) > 0 + wconfig = shift!(launched) + pid = get_next_pid() + wconfig[:pid] = pid + + (r_s, w_s) = connect_m2w(T, pid, wconfig) + + w = Worker{T}(pid, r_s, w_s, wconfig) + # install a finalizer to perform cleanup if necessary + finalizer(w, (w)->if myid() == 1 manage(T, w.id, w.config, :finalize) end) + + push!(resp_arr, w) + notify(launched_ntfy) end end @@ -1033,51 +1018,6 @@ function read_worker_host_port(io::IO) end end -function create_worker(bind_addr, port, pubhost, stream, config, manager) - tunnel = config[:tunnel] - - s = split(pubhost,'@') - user = "" - if length(s) > 1 - user = s[1] - pubhost = s[2] - else - if haskey(ENV, "USER") - user = ENV["USER"] - elseif tunnel - error("USER must be specified either in the environment or as part of the hostname when tunnel option is used") - end - end - - if tunnel - sshflags = config[:sshflags] - w = Worker(pubhost, bind_addr, port, user, sshflags) - else - w = Worker(bind_addr, port) - end - - w.config = config - w.manager = manager - - if isa(stream, AsyncStream) - let wrker = w - # redirect console output from workers to the client's stdout: - @async begin - while !eof(stream) - line = readline(stream) - print("\tFrom worker $(wrker.id):\t$line") - end - end - end - end - - # install a finalizer to perform cleanup if necessary - finalizer(w, (w)->if myid() == 1 manage(w.manager, w.id, w.config, :finalize) end) - - w -end - - function parse_connection_info(str) m = match(r"^julia_worker:(\d+)#(.*)", str) if m != nothing @@ -1105,38 +1045,143 @@ function ssh_tunnel(user, host, bind_addr, port, sshflags) localp end - immutable LocalManager <: ClusterManager end +cluster_manager = LocalManager # The default. +function init_worker{T<:ClusterManager}(::Type{T}) + reset_node() + global cluster_manager + cluster_manager = T + disable_threaded_libs() +end + show(io::IO, manager::LocalManager) = println("LocalManager()") -function launch(manager::LocalManager, np::Integer, config::Dict, resp_arr::Array, c::Condition) +function launch{T<:LocalManager}(::Type{T}, config::Dict, launched::Array, c::Condition) dir = config[:dir] exename = config[:exename] exeflags = config[:exeflags] - io_objs = cell(np) - configs = cell(np) - - # start the processes first... - for i in 1:np + for i in 1:config[:np] io, pobj = open(detach(`$(dir)/$(exename) $exeflags --bind-to $(LPROC.bind_addr)`), "r") - io_objs[i] = io - configs[i] = merge(config, AnyDict(:process => pobj)) + wconfig = copy(config) + wconfig[:process] = pobj + wconfig[:io] = io + push!(launched, wconfig) end - # ...and then read the host:port info. This optimizes overall start times. - push!(resp_arr, collect(zip(io_objs, configs))) notify(c) end -function manage(manager::LocalManager, id::Integer, config::Dict, op::Symbol) +function manage{T<:LocalManager}(::Type{T}, id::Integer, config::Dict, op::Symbol) if op == :interrupt kill(config[:process], 2) end end +function connect_m2w{T<:ClusterManager}(::Type{T}, pid::Int, config::Dict) + pid = config[:pid] + io = get(config, :io, nothing) + if isa(io, AsyncStream) + (bind_addr, port) = read_worker_host_port(io) + pubhost=get(config, :host, bind_addr) + else + pubhost=config[:host] + port=config[:port] + bind_addr=pubhost + end + + tunnel = get(config, :tunnel, false) + + s = split(pubhost,'@') + user = "" + if length(s) > 1 + user = s[1] + pubhost = s[2] + else + if haskey(ENV, "USER") + user = ENV["USER"] + elseif tunnel + error("USER must be specified either in the environment or as part of the hostname when tunnel option is used") + end + end + + if tunnel + sshflags = config[:sshflags] + (s, bind_addr) = connect_to_worker(pubhost, bind_addr, port, user, sshflags) + else + (s, bind_addr) = connect_to_worker(bind_addr, port) + end + + config[:host] = pubhost + config[:port] = port + config[:bind_addr] = bind_addr + + # write out a subset of the connect_at required for further worker-worker connection setups + config[:connect_at] = (string(bind_addr), port) + + if isa(io, AsyncStream) + let pid = pid + # redirect console output from workers to the client's stdout: + @async begin + while !eof(io) + line = readline(io) + print("\tFrom worker $(pid):\t$line") + end + end + end + end + + (s, s) +end + +function connect_w2w{T<:ClusterManager}(::Type{T}, pid::Int, config::Dict) + (rhost, rport) = config[:connect_at] + config[:host] = rhost + config[:port] = rport + if get(config, :self_is_local, false) && get(config, :r_is_local, false) + # If on localhost, use the loopback address - this addresses + # the special case of system suspend wherein the local ip + # may be changed upon system awake. + (s, bind_addr) = connect_to_worker("127.0.0.1", rport) + else + (s, bind_addr)= connect_to_worker(rhost, rport) + end + + (s,s) +end + + +function connect_to_worker(host::AbstractString, port::Integer) + # Connect to the loopback port if requested host has the same ipaddress as self. + if host == string(LPROC.bind_addr) + s = connect("127.0.0.1", uint16(port)) + else + s = connect(host, uint16(port)) + end + + # Avoid calling getaddrinfo if possible - involves a DNS lookup + # host may be a stringified ipv4 / ipv6 address or a dns name + if host == "localhost" + bind_addr = parseip("127.0.0.1") + else + try + bind_addr = parseip(host) + catch + bind_addr = getaddrinfo(host) + end + end + (s, bind_addr) +end + + +function connect_to_worker(host::AbstractString, bind_addr::AbstractString, port::Integer, tunnel_user::AbstractString, sshflags) + s = connect("localhost", ssh_tunnel(tunnel_user, host, bind_addr, uint16(port), sshflags)) + (s, parseip(bind_addr)) +end + + immutable SSHManager <: ClusterManager machines::Dict @@ -1152,15 +1197,24 @@ end show(io::IO, manager::SSHManager) = println("SSHManager(machines=", manager.machines, ")") -function launch(manager::SSHManager, np::Integer, config::Dict, resp_arr::Array, machines_launch_ntfy::Condition) +function launch{T<:SSHManager}(::Type{T}, config::Dict, launched::Array, machines_launch_ntfy::Condition) # Launch on each unique host in parallel. # Wait for all launches to complete. plaunch_ntfy = Condition() + manager = config[:manager] launch_tasks = cell(length(manager.machines)) for (i,(machine, cnt)) in enumerate(manager.machines) - launch_tasks[i] = @schedule launch_on_machine(manager, config, resp_arr, machines_launch_ntfy, machine, cnt, plaunch_ntfy) + let machine=machine, cnt=cnt + launch_tasks[i] = @schedule begin + try + launch_on_machine(manager, machine, config, launched, machines_launch_ntfy, cnt, plaunch_ntfy) + catch e + println(e) + end + end + end end while length(launch_tasks) > 0 @@ -1175,13 +1229,15 @@ function launch(manager::SSHManager, np::Integer, config::Dict, resp_arr::Array, end -function launch_on_machine(manager::SSHManager, config::Dict, resp_arr::Array, machines_launch_ntfy::Condition, - machine::AbstractString, cnt::Integer, plaunch_ntfy::Condition) +function launch_on_machine(manager::SSHManager, machine, config, launched, machines_launch_ntfy::Condition, + cnt, plaunch_ntfy::Condition) + # We don't expect dir, exename and exeflags flags to change for each worker on this machine + dir = config[:dir] exename = config[:exename] exeflags_base = config[:exeflags] - thisconfig = copy(config) # config for this worker + thisconfig = copy(config) # config for workers on this machine # machine could be of the format [user@]host[:port] bind_addr[:bind_port] machine_bind = split(machine) @@ -1198,18 +1254,17 @@ function launch_on_machine(manager::SSHManager, config::Dict, resp_arr::Array, m thisconfig[:sshflags] = sshflags host = machine_def[1] + thisconfig[:host] = host # Build up the ssh command cmd = `cd $dir && $exename $exeflags` # launch julia cmd = `sh -l -c $(shell_escape(cmd))` # shell to launch under cmd = `ssh -T -a -x -o ClearAllForwardings=yes -n $sshflags $host $(shell_escape(cmd))` # use ssh to remote launch - thisconfig[:machine] = host - - # start the processes first... + # start the processes first in parallel first... maxp = config[:max_parallel] - if config[:tunnel] + if get(config, :tunnel, false) maxp = div(maxp,2) + 1 # Since the tunnel will also take up one ssh connection end @@ -1221,6 +1276,9 @@ function launch_on_machine(manager::SSHManager, config::Dict, resp_arr::Array, m for io in ios_to_check if nb_available(io) == 0 push!(ios_to_check2, io) + else + push!(launched, merge(AnyDict(:io=>io, :host=>host), thisconfig)) + notify(machines_launch_ntfy) end end ios_to_check=ios_to_check2 @@ -1236,30 +1294,25 @@ function launch_on_machine(manager::SSHManager, config::Dict, resp_arr::Array, m end lc = cnt > maxp_in_loop ? maxp_in_loop : cnt - io_objs = cell(lc) - configs = cell(lc) - for i in 1:lc io, pobj = open(detach(cmd), "r") - io_objs[i] = io push!(ios_to_check, io) end cnt = cnt - lc - - # ...and then read the host:port info. This optimizes overall start times. - # For ssh, the tunnel connection, if any, has to be with the specified machine name. - # but the port needs to be forwarded to the bound hostname/ip-address - push!(resp_arr, collect(zip(io_objs, fill(host, lc), fill(thisconfig, lc)))) - notify(machines_launch_ntfy) - t_check=time() end + for io in ios_to_check + push!(launched, merge(AnyDict(:io=>io, :host=>host), thisconfig)) + end + + notify(machines_launch_ntfy) notify(plaunch_ntfy) end -function manage(manager::SSHManager, id::Integer, config::Dict, op::Symbol) + +function manage{T<:SSHManager}(::Type{T}, id::Integer, config::Dict, op::Symbol) if op == :interrupt if haskey(config, :ospid) machine = config[:machine] @@ -1285,7 +1338,17 @@ function addprocs_internal(np::Integer; exename=(ccall(:jl_is_debugbuild,Cint,())==0?"./julia":"./julia-debug"), sshflags::Cmd=``, manager=LocalManager(), exeflags=``, max_parallel=10) - config = AnyDict(:dir=>dir, :exename=>exename, :exeflags=>`$exeflags --worker`, :tunnel=>tunnel, :sshflags=>sshflags, :max_parallel=>max_parallel) + config = AnyDict( + :dir=>dir, + :exename=>exename, + :exeflags=>`$exeflags --worker $(typeof(manager))`, + :tunnel=>tunnel, + :sshflags=>sshflags, + :max_parallel=>max_parallel, + :manager=>manager, + :np=>np + ) + disable_threaded_libs() ret = Array(Int, 0) @@ -1294,19 +1357,27 @@ function addprocs_internal(np::Integer; resp_arr = [] c = Condition() - t = @schedule start_cluster_workers(np, config, manager, resp_arr, c) + t = @schedule begin + try + start_cluster_workers(typeof(manager), config, resp_arr, c) + catch e + println("Error starting cluster workers with ", typeof(manager), " : ", e) + end + end while true if length(resp_arr) == 0 - istaskdone(t) && break + if istaskdone(t) + break + end @schedule (sleep(1); notify(c)) wait(c) end if length(resp_arr) > 0 w = shift!(resp_arr) - id, rr = add_worker(PGRP, w) - push!(ret, id) + rr = add_worker(PGRP, w) + push!(ret, w.id) push!(rr_join, rr) end end @@ -1315,8 +1386,11 @@ function addprocs_internal(np::Integer; wait(rr) end - assert(length(ret) == np) - ret + if np > 0 + assert(length(ret) == np) + end + + sort!(ret) end addprocs(np::Integer; kwargs...) = addprocs_internal(np; kwargs...) @@ -1630,7 +1704,7 @@ function interrupt(pid::Integer) assert(myid() == 1) w = map_pid_wrkr[pid] if isa(w, Worker) - manage(w.manager, w.id, w.config, :interrupt) + manage(wtype(w), w.id, w.config, :interrupt) end end interrupt(pids::Integer...) = interrupt([pids...]) @@ -1644,6 +1718,7 @@ function interrupt(pids::AbstractVector=workers()) end end + function disable_nagle(sock) # disable nagle on all OSes ccall(:uv_tcp_nodelay, Cint, (Ptr{Void}, Cint), sock.handle, 1) @@ -1662,11 +1737,11 @@ function check_same_host(pids) # We checkfirst if all test pids have been started using the local manager, # else we check for the same bind_to addr. This handles the special case # where the local ip address may change - as during a system sleep/awake - if all(p -> (p==1) || isa(map_pid_wrkr[p].manager, LocalManager), pids) + if all(p -> (p==1) || (wtype(map_pid_wrkr[p]) == LocalManager), pids) return true else - first_bind_addr = map_pid_wrkr[pids[1]].bind_addr - return all(p -> (p != 1) && (map_pid_wrkr[p].bind_addr == first_bind_addr), pids[2:end]) + first_bind_addr = map_pid_wrkr[pids[1]].config[:bind_addr] + return all(p -> (p != 1) && (map_pid_wrkr[p].config[:bind_addr] == first_bind_addr), pids[2:end]) end end end @@ -1689,3 +1764,4 @@ function terminate_all_workers() end end end + diff --git a/base/stream.jl b/base/stream.jl index f58ab568437af..1124328a9ceb8 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -958,3 +958,48 @@ mark(x::AsyncStream) = mark(x.buffer) unmark(x::AsyncStream) = unmark(x.buffer) reset(x::AsyncStream) = reset(x.buffer) ismarked(x::AsyncStream) = ismarked(x.buffer) + + + +# BufferStream's are non-OS streams, backed by a regular IOBuffer +type BufferStream <: AsyncStream + buffer::IOBuffer + r_c::Condition + close_c::Condition + is_open::Bool + + BufferStream() = new(PipeBuffer(), Condition(), Condition(), true) +end + +isopen(s::BufferStream) = s.is_open +close(s::BufferStream) = (s.is_open = false; notify(s.r_c; all=true); notify(s.close_c; all=true); nothing) + +function wait_readnb(s::BufferStream, nb::Int) + while isopen(s) && nb_available(s.buffer) < nb + wait(s.r_c) + end +end + +function eof(s::BufferStream) + wait_readnb(s,1) + !isopen(s) && nb_available(s.buffer)<=0 +end + +# wait(s::BufferStream) = wait_readnb(s,1) + +show(io::IO, s::BufferStream) = print(io,"BufferStream() bytes waiting:",nb_available(s.buffer),", isopen:", s.is_open) + +nb_available(s::BufferStream) = nb_available(s.buffer) + +function wait_readbyte(s::BufferStream, c::UInt8) + while isopen(s) && search(s.buffer,c) <= 0 + wait(s.r_c) + end +end + +wait_close(s::BufferStream) = if isopen(s) wait(s.close_c); end +start_reading(s::BufferStream) = nothing + +write(s::BufferStream, b::UInt8) = (rv=write(s.buffer, b); notify(s.r_c; all=true);rv) +write{T}(s::BufferStream, a::Array{T}) = (rv=write(s.buffer, a); notify(s.r_c; all=true);rv) +write(s::BufferStream, p::Ptr, nb::Integer) = (rv=write(s.buffer, p, nb); notify(s.r_c; all=true);rv) diff --git a/examples/clustermanager/README b/examples/clustermanager/README new file mode 100644 index 0000000000000..1197d37cc7d51 --- /dev/null +++ b/examples/clustermanager/README @@ -0,0 +1,26 @@ +This is a proof-of-concept that uses ZeroMQ as transport. +It uses a star topology as opposed to the native mesh network. + +Package ZMQ must be installed. All workers only run on localhost. + +All Julia nodes only connect to a "broker" process that listens on known ports +8100 and 8101 via ZMQ sockets. + + +All commands must be run from `examples/clustermanager` directory + +First, start the broker. In a new console type: + julia broker.jl + +Next, start a Julia REPL and type: + push!(LOAD_PATH, "$(pwd())/ZMQCM") + using ZMQCM + ZMQCM.start_master(4) # start with four workers + + +Alternatively, head.jl, a test script could be run. It just launches the requested number of workers, +executes a simple command on al of them and exits. + julia head.jl 4 + +NOTE: As stated this is a proof-of-concept. A real Julia cluster using ZMQ will probably use +different ZMQ socket types and optimize the transport further. diff --git a/examples/clustermanager/ZMQCM/ZMQCM.jl b/examples/clustermanager/ZMQCM/ZMQCM.jl new file mode 100644 index 0000000000000..3dc11d1b92099 --- /dev/null +++ b/examples/clustermanager/ZMQCM/ZMQCM.jl @@ -0,0 +1,284 @@ +module ZMQCM + +import Base: launch, manage, connect_m2w, connect_w2w +export connect_m2w, connect_w2w, launch, manage, start_broker, start_master, start_worker + +using ZMQ + +const BROKER_SUB_PORT = 8100 +const BROKER_PUB_PORT = 8101 + +const SELF_INITIATED = 0 +const REMOTE_INITIATED = 1 + +const PAYLOAD_MSG = "J" +const CONTROL_MSG = "Z" + +const REQUEST_ACK = "R" +const ACK_MSG = "A" + +type ZMQCMan <: ClusterManager + map_zmq_julia::Dict{Int, Tuple} + c::Condition + isfree::Bool + ctx + pub + sub + zid_self + ZMQCMan() = new(Dict{Int, Tuple}(), Condition(), true) +end + +const manager = ZMQCMan() + +function lock_for_send() + if manager.isfree == true + manager.isfree = false + else + while manager.isfree == false + wait(manager.c) + if manager.isfree == true + manager.isfree = false + return + end + end + end +end + +function release_lock_for_send() + manager.isfree = true + notify(manager.c, all=true) +end + +function init_node(zid=0) + manager.ctx = Context(1) + pub=Socket(manager.ctx, PUB) # Outbound + connect(pub, "tcp://127.0.0.1:$BROKER_SUB_PORT") + + sub=Socket(manager.ctx, SUB) # In bound + connect(sub, "tcp://127.0.0.1:$BROKER_PUB_PORT") + ZMQ.set_subscribe(sub, string(zid)) + + manager.pub = pub + manager.sub = sub + manager.zid_self = zid + + (pub, sub) +end + +function send_data(zid, mtype, data) + lock_for_send() + ZMQ.send(manager.pub, Message(string(zid)), SNDMORE) + ZMQ.send(manager.pub, Message(string(manager.zid_self)), SNDMORE) + #println("Sending message of type $mtype to $zid") + ZMQ.send(manager.pub, Message(mtype), SNDMORE) + ZMQ.send(manager.pub, Message(data)) + release_lock_for_send() +end + +function setup_connection(zid, initiated_by) + try + read_stream=BufferStream() + write_stream=BufferStream() + + if initiated_by == REMOTE_INITIATED + test_remote = false + else + test_remote = true + end + + manager.map_zmq_julia[zid] = (read_stream, write_stream, test_remote) + + @schedule begin + while true + (r_s, w_s, do_test_remote) = manager.map_zmq_julia[zid] + if do_test_remote + send_data(zid, CONTROL_MSG, REQUEST_ACK) + sleep(0.5) + else + break + end + end + (r_s, w_s, do_test_remote) = manager.map_zmq_julia[zid] + + while true + data = readavailable(w_s) + send_data(zid, PAYLOAD_MSG, data) + end + end + (read_stream, write_stream) + catch e + Base.show_backtrace(STDOUT,catch_backtrace()) + println(e) + rethrow(e) + end +end + +# BROKER +function start_broker() + ctx=Context(1) + xpub=Socket(ctx, XPUB) + xsub=Socket(ctx, XSUB) + + ZMQ.bind(xsub, "tcp://127.0.0.1:$(BROKER_SUB_PORT)") + ZMQ.bind(xpub, "tcp://127.0.0.1:$(BROKER_PUB_PORT)") + + ccall((:zmq_proxy, :libzmq), Cint, (Ptr{Void}, Ptr{Void}, Ptr{Void}), xpub.data, xsub.data, C_NULL) + + # control never comes here + ZMQ.close(xpub) + ZMQ.close(xsub) + ZMQ.close(ctx) +end + +function recv_data() + try + #println("On $(manager.zid_self) waiting to recv message") + zid = int(bytestring(ZMQ.recv(manager.sub))) + assert(zid == manager.zid_self) + + from_zid = int(bytestring(ZMQ.recv(manager.sub))) + mtype = bytestring(ZMQ.recv(manager.sub)) + + #println("$zid received message of type $mtype from $from_zid") + + data = ZMQ.recv(manager.sub) + if mtype == CONTROL_MSG + cmsg = bytestring(data) + if cmsg == REQUEST_ACK + #println("$from_zid REQUESTED_ACK from $zid") + # send back a control_msg + send_data(from_zid, CONTROL_MSG, ACK_MSG) + elseif cmsg == ACK_MSG + #println("$zid got ACK_MSG from $from_zid") + (r_s, w_s, test_remote) = manager.map_zmq_julia[from_zid] + manager.map_zmq_julia[from_zid] = (r_s, w_s, false) + else + error("Unknown control message : ", cmsg) + end + data = "" + end + + (from_zid, data) + catch e + Base.show_backtrace(STDOUT,catch_backtrace()) + println(e) + rethrow(e) + end + +end + +# MASTER +function start_master(np) + init_node() + @schedule begin + try + while true + (from_zid, data) = recv_data() + + #println("master recv data from $from_zid") + + (r_s, w_s, t_r) = manager.map_zmq_julia[from_zid] + write(r_s, convert(Ptr{Uint8}, data), length(data)) + end + catch e + Base.show_backtrace(STDOUT,catch_backtrace()) + println(e) + rethrow(e) + end + end + + addprocs(np; manager=manager) +end + + +function launch{T<:ZMQCMan}(::Type{T}, config::Dict, launched::Array, c::Condition) + #println("launch{T<:ZMQCMan}") + l2 = [] + for i in 1:config[:np] + io, pobj = open(`julia worker.jl $i`, "r") + wconfig = copy(config) + wconfig[:zid] = i + wconfig[:io] = io + push!(l2, wconfig) + end + append!(launched, l2) + notify(c) +end + +function connect_m2w{T<:ZMQCMan}(::Type{T}, pid::Int, config::Dict) + #println("connect_m2w") + zid = config[:zid] + manager = config[:manager] + + print_worker_stdout(config[:io], pid) + (r_s, w_s) = setup_connection(zid, SELF_INITIATED) + + config[:connect_at] = zid # This will be useful in the worker-to-worker connection setup. + (r_s, w_s) +end + +# WORKER +function start_worker(zid) + #println("start_worker") + Base.init_worker(ZMQCMan) + init_node(zid) + + while true + (from_zid, data) = recv_data() + + #println("worker recv data from $from_zid") + + streams = get(manager.map_zmq_julia, from_zid, nothing) + if streams == nothing + # First time.. + (r_s, w_s) = setup_connection(from_zid, REMOTE_INITIATED) + Base.process_messages(r_s, w_s) + else + (r_s, w_s, t_r) = streams + end + + write(r_s, convert(Ptr{Uint8}, data), length(data)) + end +end + +function connect_w2w{T<:ZMQCMan}(::Type{T}, pid::Int, config::Dict) + #println("connect_w2w") + zid = config[:connect_at] + setup_connection(zid, SELF_INITIATED) +end + + +function manage{T<:ZMQCMan}(::Type{T}, id::Int, config::Dict, op) + nothing +end + +function print_worker_stdout(io, pid) + @async begin + while !eof(io) + line = readline(io) + print("\tFrom worker $(pid):\t$line") + end + end +end + +# function test_broker(zself, zrem) +# init_node(zself) +# (r, w) = setup_connection(zrem) +# +# @schedule begin +# while true +# println("Sending from ", zself) +# write(w, "FROM : $zself") +# sleep(1) +# end +# end +# while true +# zid = int(bytestring(ZMQ.recv(manager.sub))) +# data = ZMQ.recv(manager.sub) +# +# println("recv data ", ASCIIString(data), ", zid: $zid") +# end +# end +# + +end \ No newline at end of file diff --git a/examples/clustermanager/broker.jl b/examples/clustermanager/broker.jl new file mode 100644 index 0000000000000..cb35580aa752e --- /dev/null +++ b/examples/clustermanager/broker.jl @@ -0,0 +1,9 @@ +push!(LOAD_PATH, "$(pwd())/ZMQCM") + +using ZMQCM + +if length(ARGS) == 2 + ZMQCM.test_broker(int(ARGS[1]), int(ARGS[2])) +else + ZMQCM.start_broker() +end \ No newline at end of file diff --git a/examples/clustermanager/head.jl b/examples/clustermanager/head.jl new file mode 100644 index 0000000000000..948c8ae80f3a1 --- /dev/null +++ b/examples/clustermanager/head.jl @@ -0,0 +1,9 @@ +push!(LOAD_PATH, "$(pwd())/ZMQCM") + +using ZMQCM + +ZMQCM.start_master(int(ARGS[1])) + +resp = pmap(x -> myid() *2, [1:nworkers()]) + +println(resp) diff --git a/examples/clustermanager/worker.jl b/examples/clustermanager/worker.jl new file mode 100644 index 0000000000000..b0be636cb8276 --- /dev/null +++ b/examples/clustermanager/worker.jl @@ -0,0 +1,5 @@ +push!(LOAD_PATH, "$(pwd())/ZMQCM") + +using ZMQCM + +ZMQCM.start_worker(int(ARGS[1]))