From 9a580c6722554efcdb20c0666241f9d0b2fc9ccb Mon Sep 17 00:00:00 2001 From: tan Date: Mon, 11 Feb 2019 09:27:00 +0530 Subject: [PATCH 1/3] start_worker option to close stdin redirect stdout It does not seem essential to close stdin, and redirect stderr to stdout in workers. While it is appropriate for `DefaultClusterManager` to do these, certain other cluster managers may find it useful not to (e.g. ElasticManager in ClusterManagers.jl package) fixes: #31035 --- stdlib/Distributed/src/cluster.jl | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/stdlib/Distributed/src/cluster.jl b/stdlib/Distributed/src/cluster.jl index f753fe607250c..551f363e94e26 100644 --- a/stdlib/Distributed/src/cluster.jl +++ b/stdlib/Distributed/src/cluster.jl @@ -168,7 +168,7 @@ worker_timeout() = parse(Float64, get(ENV, "JULIA_WORKER_TIMEOUT", "60.0")) ## worker creation and setup ## """ - start_worker([out::IO=stdout], cookie::AbstractString=readline(stdin)) + start_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true) `start_worker` is an internal function which is the default entry point for worker processes connecting via TCP/IP. It sets up the process as a Julia cluster @@ -176,18 +176,19 @@ worker. host:port information is written to stream `out` (defaults to stdout). -The function closes stdin (after reading the cookie if required), redirects stderr to stdout, -listens on a free port (or if specified, the port in the `--bind-to` command -line option) and schedules tasks to process incoming TCP connections and requests. +The function reads the cookie from stdin if required, and listens on a free port +(or if specified, the port in the `--bind-to` command line option) and schedules +tasks to process incoming TCP connections and requests. It also (optionally) +closes stdin and redirects stderr to stdout. It does not return. """ -start_worker(cookie::AbstractString=readline(stdin)) = start_worker(stdout, cookie) -function start_worker(out::IO, cookie::AbstractString=readline(stdin)) +start_worker(cookie::AbstractString=readline(stdin); kwargs...) = start_worker(stdout, cookie; kwargs...) +function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true) init_multi() - close(stdin) # workers will not use it - redirect_stderr(stdout) + close_stdin && close(stdin) # workers will not use it + stderr_to_stdout && redirect_stderr(stdout) init_worker(cookie) interface = IPv4(LPROC.bind_addr) From b2d963a94981fdec9279d8fc793d76d282d1b27f Mon Sep 17 00:00:00 2001 From: tan Date: Mon, 18 Feb 2019 08:37:07 +0530 Subject: [PATCH 2/3] added tests for #31036 --- stdlib/Distributed/test/distributed_exec.jl | 34 +++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/stdlib/Distributed/test/distributed_exec.jl b/stdlib/Distributed/test/distributed_exec.jl index 592588c8c0788..93d8afb3b2ea9 100644 --- a/stdlib/Distributed/test/distributed_exec.jl +++ b/stdlib/Distributed/test/distributed_exec.jl @@ -1493,6 +1493,40 @@ cluster_cookie("foobar") # custom cookie npids = addprocs_with_testenv(WorkerArgTester(`--worker=foobar`, false)) @test remotecall_fetch(myid, npids[1]) == npids[1] +# tests for start_worker options to retain stdio (issue #31035) +struct RetainStdioTester <: ClusterManager + close_stdin::Bool + stderr_to_stdout::Bool +end + +function launch(manager::RetainStdioTester, params::Dict, launched::Array, c::Condition) + dir = params[:dir] + exename = params[:exename] + exeflags = params[:exeflags] + + jlcmd = "using Distributed; start_worker(\"\"; close_stdin=$(manager.close_stdin), stderr_to_stdout=$(manager.stderr_to_stdout));" + cmd = detach(setenv(`$exename $exeflags --bind-to $(Distributed.LPROC.bind_addr) -e $jlcmd`, dir=dir)) + proc = open(cmd, "r+") + + wconfig = WorkerConfig() + wconfig.process = proc + wconfig.io = proc.out + push!(launched, wconfig) + + notify(c) +end +manage(::RetainStdioTester, ::Integer, ::WorkerConfig, ::Symbol) = nothing + + +nprocs()>1 && rmprocs(workers()) +cluster_cookie("") + +for close_stdin in (true, false), stderr_to_stdout in (true, false) + npids = addprocs_with_testenv(RetainStdioTester(close_stdin,stderr_to_stdout)) + @test remotecall_fetch(myid, npids[1]) == npids[1] + rmprocs(npids) +end + # Issue # 22865 # Must be run on a new cluster, i.e., all workers must be in the same state. rmprocs(workers()) From d2b00bd54fd1217c659a31834a1c015b4e62b7ff Mon Sep 17 00:00:00 2001 From: tan Date: Tue, 19 Feb 2019 15:31:15 +0530 Subject: [PATCH 3/3] also verify state of stdio streams --- stdlib/Distributed/test/distributed_exec.jl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/stdlib/Distributed/test/distributed_exec.jl b/stdlib/Distributed/test/distributed_exec.jl index 93d8afb3b2ea9..2636d7367952f 100644 --- a/stdlib/Distributed/test/distributed_exec.jl +++ b/stdlib/Distributed/test/distributed_exec.jl @@ -1524,6 +1524,8 @@ cluster_cookie("") for close_stdin in (true, false), stderr_to_stdout in (true, false) npids = addprocs_with_testenv(RetainStdioTester(close_stdin,stderr_to_stdout)) @test remotecall_fetch(myid, npids[1]) == npids[1] + @test close_stdin != remotecall_fetch(()->isopen(stdin), npids[1]) + @test stderr_to_stdout == remotecall_fetch(()->(stderr === stdout), npids[1]) rmprocs(npids) end