From 8986aa9b1cde7e116874eb4b9bf5cf6142a244a4 Mon Sep 17 00:00:00 2001 From: Kishor Patil Date: Thu, 8 Sep 2016 03:25:51 +0000 Subject: [PATCH] Stop Spark Application if launcher goes down and use reflection --- .../spark/launcher/LauncherBackend.scala | 49 ++++++- .../launcher/AbstractSparkAppHandle.java | 138 ++++++++++++++++++ .../spark/launcher/ChildProcAppHandle.java | 110 +------------- .../spark/launcher/ChildThreadAppHandle.java | 54 +++++++ .../spark/launcher/LauncherConnection.java | 1 - .../spark/launcher/LauncherProtocol.java | 3 + .../apache/spark/launcher/LauncherServer.java | 47 ++++-- .../apache/spark/launcher/SparkLauncher.java | 102 ++++++++++--- .../spark/launcher/SparkSubmitRunner.java | 59 ++++++++ .../org/apache/spark/deploy/yarn/Client.scala | 10 +- 10 files changed, 434 insertions(+), 139 deletions(-) create mode 100644 launcher/src/main/java/org/apache/spark/launcher/AbstractSparkAppHandle.java create mode 100644 launcher/src/main/java/org/apache/spark/launcher/ChildThreadAppHandle.java create mode 100644 launcher/src/main/java/org/apache/spark/launcher/SparkSubmitRunner.java diff --git a/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala b/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala index a5d41a1eeb479..d6f30ad18254b 100644 --- a/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala +++ b/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala @@ -17,9 +17,11 @@ package org.apache.spark.launcher +import java.io.IOException import java.net.{InetAddress, Socket} import org.apache.spark.SPARK_VERSION +import org.apache.spark.internal.Logging import org.apache.spark.launcher.LauncherProtocol._ import org.apache.spark.util.{ThreadUtils, Utils} @@ -29,26 +31,63 @@ import org.apache.spark.util.{ThreadUtils, Utils} * * See `LauncherServer` for an explanation of how launcher communication works. */ -private[spark] abstract class LauncherBackend { +private[spark] abstract class LauncherBackend extends Logging { private var clientThread: Thread = _ private var connection: BackendConnection = _ private var lastState: SparkAppHandle.State = _ + private var stopFlag: Boolean = false @volatile private var _isConnected = false def connect(): Unit = { val port = sys.env.get(LauncherProtocol.ENV_LAUNCHER_PORT).map(_.toInt) val secret = sys.env.get(LauncherProtocol.ENV_LAUNCHER_SECRET) + val stopFlag = sys.env.get(LauncherProtocol.ENV_LAUNCHER_STOP_FLAG).map(_.toBoolean) if (port != None && secret != None) { - val s = new Socket(InetAddress.getLoopbackAddress(), port.get) + if(stopFlag != None) { + connect(port.get, secret.get, stopFlag.get) + } else { + connect(port.get, secret.get) + } + } + } + + def connect(port: Int, secret: String): Unit = { + if (port != None && secret != None) { + val s = new Socket(InetAddress.getLoopbackAddress(), port) connection = new BackendConnection(s) - connection.send(new Hello(secret.get, SPARK_VERSION)) + connection.send(new Hello(secret, SPARK_VERSION)) clientThread = LauncherBackend.threadFactory.newThread(connection) clientThread.start() _isConnected = true + if(stopFlag) { + val shutdownHook: Runnable = new Runnable() { + def run { + logInfo("LauncherBackend shutdown hook invoked..") + try { + if(_isConnected && stopFlag) { + onStopRequest() + } + } + catch { + case anotherIOE: IOException => { + logInfo("Error while running LauncherBackend shutdownHook...", anotherIOE) + } + } + } + } + + val shutdownHookThread: Thread = LauncherBackend.threadFactory.newThread(shutdownHook) + Runtime.getRuntime.addShutdownHook(shutdownHookThread) + } } } + def connect(port: Int, secret: String, stopFlag: Boolean): Unit = { + this.stopFlag = stopFlag + connect(port, secret) + } + def close(): Unit = { if (connection != null) { try { @@ -71,6 +110,9 @@ private[spark] abstract class LauncherBackend { if (connection != null && lastState != state) { connection.send(new SetState(state)) lastState = state + if(!_isConnected && stopFlag) { + fireStopRequest() + } } } @@ -115,7 +157,6 @@ private[spark] abstract class LauncherBackend { _isConnected = false } } - } } diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractSparkAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractSparkAppHandle.java new file mode 100644 index 0000000000000..ee1188f2e022a --- /dev/null +++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractSparkAppHandle.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.launcher; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + + +public abstract class AbstractSparkAppHandle implements SparkAppHandle { + private static final Logger LOG = Logger.getLogger(AbstractSparkAppHandle.class.getName()); + protected final String secret; + protected final LauncherServer server; + protected boolean disposed; + protected List listeners; + protected State state; + private LauncherConnection connection; + private String appId; + + OutputRedirector redirector; + + + + public AbstractSparkAppHandle(LauncherServer server, String secret) { + this.server = server; + this.secret = secret; + this.state = State.UNKNOWN; + } + + @Override + public synchronized void addListener(Listener l) { + if (listeners == null) { + listeners = new ArrayList<>(); + } + listeners.add(l); + } + + @Override + public State getState() { + return state; + } + + @Override + public String getAppId() { + return appId; + } + + @Override + public void stop() { + CommandBuilderUtils.checkState(connection != null, "Application is still not connected."); + try { + connection.send(new LauncherProtocol.Stop()); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + + @Override + public synchronized void disconnect() { + if (!disposed) { + disposed = true; + if (connection != null) { + try { + connection.close(); + } catch (IOException ioe) { + // no-op. + } + } + server.unregister(this); + if (redirector != null) { + redirector.stop(); + } + } + } + + String getSecret() { + return secret; + } + + void setConnection(LauncherConnection connection) { + this.connection = connection; + } + + LauncherServer getServer() { + return server; + } + + LauncherConnection getConnection() { + return connection; + } + + void setState(State s) { + if (!state.isFinal()) { + state = s; + fireEvent(false); + } else { + LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.", + new Object[]{state, s}); + } + } + + void setAppId(String appId) { + this.appId = appId; + fireEvent(true); + } + + private synchronized void fireEvent(boolean isInfoChanged) { + if (listeners != null) { + for (Listener l : listeners) { + if (isInfoChanged) { + l.infoChanged(this); + } else { + l.stateChanged(this); + } + } + } + } + +} + diff --git a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java index c0779e1c4e9a7..f7bcc1c1b25c7 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java +++ b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java @@ -27,71 +27,14 @@ /** * Handle implementation for monitoring apps started as a child process. */ -class ChildProcAppHandle implements SparkAppHandle { +class ChildProcAppHandle extends AbstractSparkAppHandle { private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName()); - private final String secret; - private final LauncherServer server; - private Process childProc; - private boolean disposed; - private LauncherConnection connection; - private List listeners; - private State state; - private String appId; - private OutputRedirector redirector; ChildProcAppHandle(String secret, LauncherServer server) { - this.secret = secret; - this.server = server; - this.state = State.UNKNOWN; - } - - @Override - public synchronized void addListener(Listener l) { - if (listeners == null) { - listeners = new ArrayList<>(); - } - listeners.add(l); - } - - @Override - public State getState() { - return state; - } - - @Override - public String getAppId() { - return appId; - } - - @Override - public void stop() { - CommandBuilderUtils.checkState(connection != null, "Application is still not connected."); - try { - connection.send(new LauncherProtocol.Stop()); - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - } - - @Override - public synchronized void disconnect() { - if (!disposed) { - disposed = true; - if (connection != null) { - try { - connection.close(); - } catch (IOException ioe) { - // no-op. - } - } - server.unregister(this); - if (redirector != null) { - redirector.stop(); - } - } + super(server, secret); } @Override @@ -103,13 +46,10 @@ public synchronized void kill() { try { childProc.exitValue(); } catch (IllegalThreadStateException e) { - // Child is still alive. Try to use Java 8's "destroyForcibly()" if available, - // fall back to the old API if it's not there. try { - Method destroy = childProc.getClass().getMethod("destroyForcibly"); - destroy.invoke(childProc); - } catch (Exception inner) { childProc.destroy(); + } catch (Exception inner) { + // no-op } } finally { childProc = null; @@ -117,53 +57,11 @@ public synchronized void kill() { } } - String getSecret() { - return secret; - } - void setChildProc(Process childProc, String loggerName) { this.childProc = childProc; this.redirector = new OutputRedirector(childProc.getInputStream(), loggerName, SparkLauncher.REDIRECTOR_FACTORY); } - void setConnection(LauncherConnection connection) { - this.connection = connection; - } - - LauncherServer getServer() { - return server; - } - - LauncherConnection getConnection() { - return connection; - } - - void setState(State s) { - if (!state.isFinal()) { - state = s; - fireEvent(false); - } else { - LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.", - new Object[] { state, s }); - } - } - - void setAppId(String appId) { - this.appId = appId; - fireEvent(true); - } - - private synchronized void fireEvent(boolean isInfoChanged) { - if (listeners != null) { - for (Listener l : listeners) { - if (isInfoChanged) { - l.infoChanged(this); - } else { - l.stateChanged(this); - } - } - } - } } diff --git a/launcher/src/main/java/org/apache/spark/launcher/ChildThreadAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/ChildThreadAppHandle.java new file mode 100644 index 0000000000000..c2b5eb13cef58 --- /dev/null +++ b/launcher/src/main/java/org/apache/spark/launcher/ChildThreadAppHandle.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.launcher; + +import java.util.logging.Logger; + +public class ChildThreadAppHandle extends AbstractSparkAppHandle { + private static final Logger LOG = Logger.getLogger(ChildThreadAppHandle.class.getName()); + private Thread childThread; + + public ChildThreadAppHandle(String secret, LauncherServer server) { + super(server, secret); + } + + @Override + public synchronized void kill() { + if (!disposed) { + disconnect(); + } + if (childThread!= null) { + try { + childThread.join(3000); + } catch (IllegalThreadStateException | InterruptedException e) { + try { + childThread.interrupt(); + } catch (Exception inner) { + LOG.info("Failed to stop Thread: "+ inner.getMessage()); + } + } finally { + childThread = null; + } + } + + } + + void setChildThread(Thread childThread) { + this.childThread = childThread; + } +} diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherConnection.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherConnection.java index eec264909bbb6..5991ec80a2bdc 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/LauncherConnection.java +++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherConnection.java @@ -106,5 +106,4 @@ public void close() throws IOException { } } } - } diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherProtocol.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherProtocol.java index 042f11cd9e434..5081ac2138125 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/LauncherProtocol.java +++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherProtocol.java @@ -32,6 +32,9 @@ final class LauncherProtocol { /** Environment variable where the secret for connecting back to the server is stored. */ static final String ENV_LAUNCHER_SECRET = "_SPARK_LAUNCHER_SECRET"; + /** Environment variable where the stop if Launcher is stored. */ + static final String ENV_LAUNCHER_STOP_FLAG = "_SPARK_LAUNCHER_STOP_FLAG"; + static class Message implements Serializable { } diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java index ae43f563e8b46..0dbf3d0497e2f 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java +++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java @@ -89,11 +89,32 @@ class LauncherServer implements Closeable { private static volatile LauncherServer serverInstance; /** - * Creates a handle for an app to be launched. This method will start a server if one hasn't been + * Creates a handle for an app to be launched using ChildProc. + * This method will start a server if one hasn't been * started yet. The server is shared for multiple handles, and once all handles are disposed of, * the server is shut down. */ static synchronized ChildProcAppHandle newAppHandle() throws IOException { + return (ChildProcAppHandle) LauncherServer.newAppHandle(false); + } + + /** + * Creates a handle for an app to be launched from a Thread within the current JVM. + * This method will start a server if one hasn't been + * started yet. The server is shared for multiple handles, and once all handles are disposed of, + * the server is shut down. + */ + static synchronized ChildThreadAppHandle newAppThreadHandle() throws IOException { + return (ChildThreadAppHandle) LauncherServer.newAppHandle(true); + } + + + /** + * Creates a handle for an app to be launched. This method will start a server if one hasn't been + * started yet. The server is shared for multiple handles, and once all handles are disposed of, + * the server is shut down. + */ + static synchronized AbstractSparkAppHandle newAppHandle(boolean isThreadHandle) throws IOException { LauncherServer server = serverInstance != null ? serverInstance : new LauncherServer(); server.ref(); serverInstance = server; @@ -102,8 +123,7 @@ static synchronized ChildProcAppHandle newAppHandle() throws IOException { while (server.pending.containsKey(secret)) { secret = server.createSecret(); } - - return server.newAppHandle(secret); + return server.newAppHandle(secret, isThreadHandle); } static LauncherServer getServerInstance() { @@ -112,7 +132,7 @@ static LauncherServer getServerInstance() { private final AtomicLong refCount; private final AtomicLong threadIds; - private final ConcurrentMap pending; + private final ConcurrentMap pending; private final List clients; private final ServerSocket server; private final Thread serverThread; @@ -157,9 +177,15 @@ public void run() { * Creates a new app handle. The handle will wait for an incoming connection for a configurable * amount of time, and if one doesn't arrive, it will transition to an error state. */ - ChildProcAppHandle newAppHandle(String secret) { - ChildProcAppHandle handle = new ChildProcAppHandle(secret, this); - ChildProcAppHandle existing = pending.putIfAbsent(secret, handle); + AbstractSparkAppHandle newAppHandle(String secret, boolean isThreadHandle) { + + AbstractSparkAppHandle handle; + if(isThreadHandle) { + handle = new ChildThreadAppHandle(secret, this); + } else { + handle = new ChildProcAppHandle(secret, this); + } + AbstractSparkAppHandle existing = pending.putIfAbsent(secret, handle); CommandBuilderUtils.checkState(existing == null, "Multiple handles with the same secret."); return handle; } @@ -180,6 +206,7 @@ public void close() throws IOException { } } } + if (serverThread != null) { try { serverThread.join(); @@ -215,7 +242,7 @@ int getPort() { * Removes the client handle from the pending list (in case it's still there), and unrefs * the server. */ - void unregister(ChildProcAppHandle handle) { + void unregister(AbstractSparkAppHandle handle) { pending.remove(handle.getSecret()); unref(); } @@ -282,7 +309,7 @@ private String createSecret() { private class ServerConnection extends LauncherConnection { private TimerTask timeout; - private ChildProcAppHandle handle; + private AbstractSparkAppHandle handle; ServerConnection(Socket socket, TimerTask timeout) throws IOException { super(socket); @@ -296,7 +323,7 @@ protected void handle(Message msg) throws IOException { timeout.cancel(); timeout = null; Hello hello = (Hello) msg; - ChildProcAppHandle handle = pending.remove(hello.secret); + AbstractSparkAppHandle handle = pending.remove(hello.secret); if (handle != null) { handle.setConnection(this); handle.setState(SparkAppHandle.State.CONNECTED); diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java index ea56214d2390c..baf81f793976d 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java @@ -19,6 +19,7 @@ import java.io.File; import java.io.IOException; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -73,6 +74,21 @@ public class SparkLauncher { /** Logger name to use when launching a child process. */ public static final String CHILD_PROCESS_LOGGER_NAME = "spark.launcher.childProcLoggerName"; + /** Launcher Server Port to use when launching a child process. */ + public static final String LAUNCHER_INTERNAL_PORT = "spark.launcher.internal.port"; + + /** Launcher Server sets this when launching a child process. */ + public static final String CHILD_PROCESS_LAUNCHER_INTERNAL_SECRET = "spark.launcher.internal.secret"; + + /** Stop Flag if interrupted Launcher Server goes away. */ + public static final String CHILD_PROCESS_LAUNCHER_STOP_FLAG = "spark.launcher.internal.stop.flag"; + + public SparkLauncher stopIfInterrupted() { + this.stopIfInterrupted = true; + return this; + } + + private boolean stopIfInterrupted = false; /** * A special value for the resource that tells Spark to not try to process the app resource as a * file. This is useful when the class being executed is added to the application using other @@ -503,23 +519,7 @@ public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) thr // Only setup stderr + stdout to logger redirection if user has not otherwise configured output // redirection. if (loggerName == null) { - String appName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME); - if (appName == null) { - if (builder.appName != null) { - appName = builder.appName; - } else if (builder.mainClass != null) { - int dot = builder.mainClass.lastIndexOf("."); - if (dot >= 0 && dot < builder.mainClass.length() - 1) { - appName = builder.mainClass.substring(dot + 1, builder.mainClass.length()); - } else { - appName = builder.mainClass; - } - } else if (builder.appResource != null) { - appName = new File(builder.appResource).getName(); - } else { - appName = String.valueOf(COUNTER.incrementAndGet()); - } - } + String appName = getAppName(); String loggerPrefix = getClass().getPackage().getName(); loggerName = String.format("%s.app.%s", loggerPrefix, appName); pb.redirectErrorStream(true); @@ -528,6 +528,7 @@ public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) thr pb.environment().put(LauncherProtocol.ENV_LAUNCHER_PORT, String.valueOf(LauncherServer.getServerInstance().getPort())); pb.environment().put(LauncherProtocol.ENV_LAUNCHER_SECRET, handle.getSecret()); + pb.environment().put(LauncherProtocol.ENV_LAUNCHER_STOP_FLAG, String.valueOf(stopIfInterrupted)); try { handle.setChildProc(pb.start(), loggerName); } catch (IOException ioe) { @@ -538,6 +539,71 @@ public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) thr return handle; } + private String getAppName() throws IOException { + String appName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME); + if (appName == null) { + if (builder.appName != null) { + appName = builder.appName; + } else if (builder.mainClass != null) { + int dot = builder.mainClass.lastIndexOf("."); + if (dot >= 0 && dot < builder.mainClass.length() - 1) { + appName = builder.mainClass.substring(dot + 1, builder.mainClass.length()); + } else { + appName = builder.mainClass; + } + } else if (builder.appResource != null) { + appName = new File(builder.appResource).getName(); + } else { + appName = String.valueOf(COUNTER.incrementAndGet()); + } + } + return appName; + } + + /** + * Starts a Spark application. + *

+ * This method returns a handle that provides information about the running application and can + * be used to do basic interaction with it. + *

+ * The returned handle assumes that the application will instantiate a single SparkContext + * during its lifetime. Once that context reports a final state (one that indicates the + * SparkContext has stopped), the handle will not perform new state transitions, so anything + * that happens after that cannot be monitored. The underlying application is launched as + * a Thread, {@link SparkAppHandle#kill()} can still be used to kill the spark application. + *

+ * @since 2.1.0 + * @param listeners Listeners to add to the handle before the app is launched. + * @return A handle for the launched application. + */ + public SparkAppHandle startApplicationInProcess(SparkAppHandle.Listener... listeners) throws IOException { + + ChildThreadAppHandle handle = LauncherServer.newAppThreadHandle(); + for (SparkAppHandle.Listener l : listeners) { + handle.addListener(l); + } + + String appName = getAppName(); + setConf(LAUNCHER_INTERNAL_PORT,String.valueOf(LauncherServer.getServerInstance().getPort())); + setConf(CHILD_PROCESS_LAUNCHER_INTERNAL_SECRET, handle.getSecret()); + setConf(CHILD_PROCESS_LAUNCHER_STOP_FLAG, String.valueOf(stopIfInterrupted)); + try { + //trying to see if method is available in the classpath. + Method main = SparkSubmitRunner.getSparkSubmitMain(); + Thread submitJobThread = new Thread(new SparkSubmitRunner(main, builder.buildSparkSubmitArgs())); + submitJobThread.setName(appName); + handle.setChildThread(submitJobThread); + submitJobThread.start(); + } catch (ClassNotFoundException cnfe) { + throw new IOException(cnfe); + } catch (NoSuchMethodException nsme) { + throw new IOException(nsme); + } + return handle; + + } + + private ProcessBuilder createBuilder() { List cmd = new ArrayList<>(); String script = isWindows() ? "spark-submit.cmd" : "spark-submit"; @@ -613,4 +679,6 @@ protected void handleExtraArgs(List extra) { } + + } diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitRunner.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitRunner.java new file mode 100644 index 0000000000000..eb421012bd5f2 --- /dev/null +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitRunner.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.launcher; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.List; + +class SparkSubmitRunner implements Runnable { + private Method main; + private final List args; + + SparkSubmitRunner(Method main, List args) { + this.main = main; + this.args = args; + } + /** + * Trying to see if method is available in the classpath. + */ + protected static Method getSparkSubmitMain() throws ClassNotFoundException, NoSuchMethodException { + Class cls = Class.forName("org.apache.spark.deploy.SparkSubmit"); + return cls.getDeclaredMethod("main", String[].class); + } + + @Override + public void run() { + try { + if(main == null) { + main = getSparkSubmitMain(); + } + Object argsObj = args.toArray(new String[args.size()]); + main.invoke(null, argsObj); + } catch (IllegalAccessException illAcEx) { + throw new RuntimeException(illAcEx); + } catch (InvocationTargetException invokEx) { + throw new RuntimeException(invokEx); + } catch (RuntimeException re) { + throw re; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + +} diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 2398f0aea316a..a16a7e77a7a2d 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -73,6 +73,10 @@ private[spark] class Client( private val isClusterMode = sparkConf.get("spark.submit.deployMode", "client") == "cluster" + private val launcherServerPort : Int = sparkConf.get("spark.launcher.internal.port", "0").toInt + private val launcherServerSecret : String = sparkConf.get("spark.launcher.internal.secret", "") + private val launcherServerStopFlag : Boolean = sparkConf.get("spark.launcher.internal.stop.flag", "false").toBoolean + // AM related configurations private val amMemory = if (isClusterMode) { sparkConf.get(DRIVER_MEMORY).toInt @@ -144,7 +148,11 @@ private[spark] class Client( def submitApplication(): ApplicationId = { var appId: ApplicationId = null try { - launcherBackend.connect() + if ( launcherServerSecret !=null && launcherServerSecret != "" && launcherServerPort != 0 && launcherServerStopFlag != null) { + launcherBackend.connect(launcherServerPort, launcherServerSecret, launcherServerStopFlag) + } else { + launcherBackend.connect() + } // Setup the credentials before doing anything else, // so we have don't have issues at any point. setupCredentials()