Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17443][SPARK-11035] Stop Spark Application if launcher goes down and use reflection #15009

Closed
wants to merge 55 commits into from
Closed
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
8986aa9
Stop Spark Application if launcher goes down and use reflection
kishorvpatil Sep 8, 2016
99b1d1b
Fixing coding style issues and review comments
kishorvpatil Sep 8, 2016
70a67fb
formatting fixes
kishorvpatil Sep 8, 2016
bac5cd2
Remove extra empty lines
kishorvpatil Sep 8, 2016
38a7e3a
Formatting and code review comments
kishorvpatil Sep 9, 2016
67f8de7
Remove unwanted conditional to null check Boolean value
kishorvpatil Sep 9, 2016
57defa9
Addressing code review comments
kishorvpatil Sep 14, 2016
eaa1bca
Hide launcher internal configs from Environment page
kishorvpatil Sep 15, 2016
a3250ac
Avoid using System properties while launching Application in thread mode
kishorvpatil Oct 3, 2016
58c6bac
Addressing few formatting related comments
kishorvpatil Oct 18, 2016
0a57684
Adding SparkApp Trait
kishorvpatil Oct 19, 2016
1fe498b
Adding documentation to launcher package
kishorvpatil Oct 19, 2016
25c3258
Another minor code comment fix
kishorvpatil Oct 19, 2016
14050f5
Fixing scala style issues
kishorvpatil Oct 19, 2016
92e4445
Make SparkApp trait Singleton
kishorvpatil Oct 20, 2016
64a21b3
Addressing Review comments
kishorvpatil Oct 28, 2016
1e6311a
Merge branch 'master' of git://git.apache.org/spark into SPARK-17443
kishorvpatil Nov 7, 2016
c207b30
Fix review minor comments
kishorvpatil Nov 7, 2016
ad20ccc
Fixing code to address review comments
kishorvpatil Jan 30, 2017
6a7ba5b
rename variable
kishorvpatil Jan 31, 2017
3cb8e85
Merge branch 'master' of git://git.apache.org/spark into SPARK-17443
kishorvpatil Jan 31, 2017
3091105
Adding scala doc to the method
kishorvpatil Jan 31, 2017
2fdcec9
Fixing format to curly braces
kishorvpatil Jan 31, 2017
64a0e45
Fixing comment formatting
kishorvpatil Feb 2, 2017
4357107
Fixing review comments on use of sys.env
kishorvpatil Feb 6, 2017
2707d21
Merge branch 'master' of git://git.apache.org/spark into SPARK-17443
kishorvpatil Feb 8, 2017
cc2c0be
Addressing review comments on documentation and minor variable changes
kishorvpatil Feb 10, 2017
82df055
Addressing review comments
kishorvpatil Mar 6, 2017
99d8c29
Fixing few more comments
kishorvpatil Mar 7, 2017
41cf6da
Adding tests for SparkLauncher in thread model
kishorvpatil Mar 8, 2017
b098ecd
Update documenation
kishorvpatil Mar 8, 2017
b66243d
Adding code review comments on documentation
kishorvpatil Mar 10, 2017
bc99435
Adding integration tests
kishorvpatil Mar 14, 2017
517fed0
Fixing scala style
kishorvpatil Mar 14, 2017
a3d18b4
Merge branch 'master' of git://git.apache.org/spark into SPARK-17443
kishorvpatil Mar 14, 2017
c17f15f
Use val instead of var
kishorvpatil Mar 15, 2017
026d026
formatting and refactoring
kishorvpatil Mar 20, 2017
fe5b5d6
Refactor tests code
kishorvpatil Mar 21, 2017
677edf7
Refactor launcher variables to package private
kishorvpatil Mar 22, 2017
ee3f24a
Fixing compilation errors
kishorvpatil Mar 28, 2017
30b460c
Fixing compilation errors
kishorvpatil Mar 28, 2017
7323200
Addressinng code review comments
kishorvpatil Apr 4, 2017
0cfd4a7
Remove sys.env reference and passing it to SparkApp
kishorvpatil Apr 6, 2017
8609874
Removing changes related to passing sys.env
kishorvpatil Apr 19, 2017
ab50dd8
Merge branch 'master' of git://git.apache.org/spark into SPARK-17443
kishorvpatil Apr 20, 2017
14c6365
Fixing documentation and review comments
kishorvpatil Apr 26, 2017
04e56fc
Adding check to launch as thread
kishorvpatil Apr 27, 2017
7ee465f
Fixing breaking tests
kishorvpatil Apr 28, 2017
3f060b6
Fix documentation
kishorvpatil Apr 28, 2017
f1b49d8
Addressing review comments
kishorvpatil May 4, 2017
2996fb1
Fix minor review comments
kishorvpatil May 9, 2017
a311721
Fixing SparkLauncherSuite test
kishorvpatil May 10, 2017
d906072
Fix SparkLauncherSuite with waitFor method.
kishorvpatil May 23, 2017
81fd297
Fixing SparkLauncherSuite unit test
kishorvpatil Jun 14, 2017
10513ec
Fixing review comments
kishorvpatil Jun 14, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkApp.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

/**
* An interface that can be implemented by applications launched by SparkSubmit
* which exposes the Spark job configuration explicitly.
*/
private[spark] trait SparkApp {
this: Singleton =>

/**
* Method executed by SparkSubmit to run the application.
*
* @param args - all arguments for SparkApp.
* @param conf - Spark Configuration.
*/
def sparkMain(args: Array[String], conf: Map[String, String]): Unit

}
29 changes: 21 additions & 8 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.security.PrivilegedExceptionAction
import java.text.ParseException

import scala.annotation.tailrec
import scala.collection.JavaConverters._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this being used anywhere?

import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
import scala.util.Properties

Expand Down Expand Up @@ -691,10 +692,6 @@ object SparkSubmit extends CommandLineUtils {
addJarToClasspath(jar, loader)
}

for ((key, value) <- sysProps) {
System.setProperty(key, value)
}

var mainClass: Class[_] = null

try {
Expand Down Expand Up @@ -725,9 +722,15 @@ object SparkSubmit extends CommandLineUtils {
printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
}

val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
if (!Modifier.isStatic(mainMethod.getModifiers)) {
throw new IllegalStateException("The main method in the given main class must be static")
val sparkAppMainMethod = mainClass.getMethods().find(_.getName == "sparkMain")
val childSparkConf = sysProps.filter{ p => p._1.startsWith("spark.") }.toMap
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: again, please address the feedback that is given. I've lost count of how many times I've pointed out that there's a missing space between filter and {.


// If running a SparkApp we can explicitly pass in the confs separately.
// If we aren't running a SparkApp they get passed via the system properties.
if (sparkAppMainMethod.isEmpty) {
sysProps.foreach { case (key, value) =>
System.setProperty(key, value)
}
}

@tailrec
Expand All @@ -741,7 +744,17 @@ object SparkSubmit extends CommandLineUtils {
}

try {
mainMethod.invoke(null, childArgs.toArray)
if (sparkAppMainMethod.isDefined) {
sparkAppMainMethod.get.invoke(null, childArgs.toArray, childSparkConf)
} else {
val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)

if (!Modifier.isStatic(mainMethod.getModifiers)) {
throw new IllegalStateException("The main method in the given main class must be static")
}

mainMethod.invoke(null, childArgs.toArray)
}
} catch {
case t: Throwable =>
findCause(t) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,59 @@

package org.apache.spark.launcher

import java.io.IOException
import java.net.{InetAddress, Socket}

import org.apache.spark.SPARK_VERSION
import org.apache.spark.internal.Logging
import org.apache.spark.launcher.LauncherProtocol._
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.{ShutdownHookManager, ThreadUtils, Utils}

/**
* A class that can be used to talk to a launcher server. Users should extend this class to
* provide implementation for the abstract methods.
*
* See `LauncherServer` for an explanation of how launcher communication works.
*/
private[spark] abstract class LauncherBackend {
private[spark] abstract class LauncherBackend extends Logging {

private var clientThread: Thread = _
private var connection: BackendConnection = _
private var lastState: SparkAppHandle.State = _
private var stopOnShutdown: Boolean = false
@volatile private var _isConnected = false

def connect(): Unit = {
val port = sys.env.get(LauncherProtocol.ENV_LAUNCHER_PORT).map(_.toInt)
val secret = sys.env.get(LauncherProtocol.ENV_LAUNCHER_SECRET)
val stopFlag = sys.env.get(LauncherProtocol.ENV_LAUNCHER_STOP_IF_SHUTDOWN).map(_.toBoolean)
if (port != None && secret != None) {
val s = new Socket(InetAddress.getLoopbackAddress(), port.get)
connection = new BackendConnection(s)
connection.send(new Hello(secret.get, SPARK_VERSION))
clientThread = LauncherBackend.threadFactory.newThread(connection)
clientThread.start()
_isConnected = true
connect(port.get, secret.get, stopFlag.getOrElse(false))
}
}

def connect(port: Int, secret: String, stopFlag: Boolean): Unit = {
this.stopOnShutdown = stopFlag
val s = new Socket(InetAddress.getLoopbackAddress(), port)
connection = new BackendConnection(s)
connection.send(new Hello(secret, SPARK_VERSION))
clientThread = LauncherBackend.threadFactory.newThread(connection)
clientThread.start()
_isConnected = true
if (stopOnShutdown) {
logDebug("Adding shutdown hook") // force eager creation of logger
ShutdownHookManager.addShutdownHook(
ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
logInfo("Invoking onStopRequest() from shutdown hook")
try {
if (_isConnected) {
onStopRequest()
}
} catch {
case ioException: IOException =>
logError("Error while running LauncherBackend shutdownHook...", ioException)
}
}
}
}

Expand Down Expand Up @@ -110,12 +134,14 @@ private[spark] abstract class LauncherBackend {
override def close(): Unit = {
try {
super.close()
if (stopOnShutdown) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See previous comment for why I don't think this is correct.

fireStopRequest()
}
} finally {
onDisconnected()
_isConnected = false
}
}

}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,28 @@ public void testChildProcLauncher() throws Exception {
assertEquals(0, app.waitFor());
}

@Test
public void testThreadLauncher() throws Exception {
// This test is failed on Windows due to the failure of initiating executors
// by the path length limitation. See SPARK-18718.
assumeTrue(!Utils.isWindows());

launcher
.setMaster("local")
.setAppResource(SparkLauncher.NO_RESOURCE)
.setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
"-Dfoo=bar -Dtest.appender=childproc")
.setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path"))
.setMainClass(SparkLauncherTestApp.class.getName())
.launchAsThread(true)
.addAppArgs("proc");
final Process app = launcher.launch();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this testing? launch() will always launch a child process. Which indicates two problems:

  • this test is not testing anything that hasn't been tested before.
  • SparkLauncher should probably be throwing an error if you use .launchAsThread(true) and then call launch().


new OutputRedirector(app.getInputStream(), TF);
new OutputRedirector(app.getErrorStream(), TF);
assertEquals(0, app.waitFor());
}

public static class SparkLauncherTestApp {

public static void main(String[] args) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.launcher;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

abstract class AbstractSparkAppHandle implements SparkAppHandle {
private static final Logger LOG = Logger.getLogger(AbstractSparkAppHandle.class.getName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: getName() is unnecessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java.util.logging.Logger#getLogger would complain if argument is not a String.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, forgot this is java.util.logging not slf4j...


protected final String secret;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please add a blank line between static and non-static fields.

protected final LauncherServer server;
protected boolean disposed;
protected List<Listener> listeners;
protected State state;
private LauncherConnection connection;
private String appId;
OutputRedirector redirector;

Copy link
Contributor

@tgravescs tgravescs Sep 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove extra blank newlines

public AbstractSparkAppHandle(LauncherServer server, String secret) {
this.server = server;
this.secret = secret;
this.state = State.UNKNOWN;
}

@Override
public synchronized void addListener(Listener l) {
if (listeners == null) {
listeners = new ArrayList<>();
}
listeners.add(l);
}

@Override
public State getState() {
return state;
}

@Override
public String getAppId() {
return appId;
}

@Override
public void stop() {
CommandBuilderUtils.checkState(connection != null, "Application is still not connected.");
try {
connection.send(new LauncherProtocol.Stop());
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}

@Override
public synchronized void disconnect() {
if (!disposed) {
disposed = true;
if (connection != null) {
try {
connection.close();
} catch (IOException ioe) {
// no-op.
}
}
server.unregister(this);
if (redirector != null) {
redirector.stop();
}
}
}

String getSecret() {
return secret;
}

void setConnection(LauncherConnection connection) {
this.connection = connection;
}

LauncherServer getServer() {
return server;
}

LauncherConnection getConnection() {
return connection;
}

void setState(State s) {
if (!state.isFinal()) {
state = s;
fireEvent(false);
} else {
LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.",
new Object[]{state, s});
}
}

void setAppId(String appId) {
this.appId = appId;
fireEvent(true);
}

private synchronized void fireEvent(boolean isInfoChanged) {
if (listeners != null) {
for (Listener l : listeners) {
if (isInfoChanged) {
l.infoChanged(this);
} else {
l.stateChanged(this);
}
}
}
}
}
Loading