Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17443][SPARK-11035] Stop Spark Application if launcher goes down and use reflection #15009

Closed
wants to merge 55 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
8986aa9
Stop Spark Application if launcher goes down and use reflection
kishorvpatil Sep 8, 2016
99b1d1b
Fixing coding style issues and review comments
kishorvpatil Sep 8, 2016
70a67fb
formatting fixes
kishorvpatil Sep 8, 2016
bac5cd2
Remove extra empty lines
kishorvpatil Sep 8, 2016
38a7e3a
Formatting and code review comments
kishorvpatil Sep 9, 2016
67f8de7
Remove unwanted conditional to null check Boolean value
kishorvpatil Sep 9, 2016
57defa9
Addressing code review comments
kishorvpatil Sep 14, 2016
eaa1bca
Hide launcher internal configs from Environment page
kishorvpatil Sep 15, 2016
a3250ac
Avoid using System properties while launching Application in thread mode
kishorvpatil Oct 3, 2016
58c6bac
Addressing few formatting related comments
kishorvpatil Oct 18, 2016
0a57684
Adding SparkApp Trait
kishorvpatil Oct 19, 2016
1fe498b
Adding documentation to launcher package
kishorvpatil Oct 19, 2016
25c3258
Another minor code comment fix
kishorvpatil Oct 19, 2016
14050f5
Fixing scala style issues
kishorvpatil Oct 19, 2016
92e4445
Make SparkApp trait Singleton
kishorvpatil Oct 20, 2016
64a21b3
Addressing Review comments
kishorvpatil Oct 28, 2016
1e6311a
Merge branch 'master' of git://git.apache.org/spark into SPARK-17443
kishorvpatil Nov 7, 2016
c207b30
Fix review minor comments
kishorvpatil Nov 7, 2016
ad20ccc
Fixing code to address review comments
kishorvpatil Jan 30, 2017
6a7ba5b
rename variable
kishorvpatil Jan 31, 2017
3cb8e85
Merge branch 'master' of git://git.apache.org/spark into SPARK-17443
kishorvpatil Jan 31, 2017
3091105
Adding scala doc to the method
kishorvpatil Jan 31, 2017
2fdcec9
Fixing format to curly braces
kishorvpatil Jan 31, 2017
64a0e45
Fixing comment formatting
kishorvpatil Feb 2, 2017
4357107
Fixing review comments on use of sys.env
kishorvpatil Feb 6, 2017
2707d21
Merge branch 'master' of git://git.apache.org/spark into SPARK-17443
kishorvpatil Feb 8, 2017
cc2c0be
Addressing review comments on documentation and minor variable changes
kishorvpatil Feb 10, 2017
82df055
Addressing review comments
kishorvpatil Mar 6, 2017
99d8c29
Fixing few more comments
kishorvpatil Mar 7, 2017
41cf6da
Adding tests for SparkLauncher in thread model
kishorvpatil Mar 8, 2017
b098ecd
Update documenation
kishorvpatil Mar 8, 2017
b66243d
Adding code review comments on documentation
kishorvpatil Mar 10, 2017
bc99435
Adding integration tests
kishorvpatil Mar 14, 2017
517fed0
Fixing scala style
kishorvpatil Mar 14, 2017
a3d18b4
Merge branch 'master' of git://git.apache.org/spark into SPARK-17443
kishorvpatil Mar 14, 2017
c17f15f
Use val instead of var
kishorvpatil Mar 15, 2017
026d026
formatting and refactoring
kishorvpatil Mar 20, 2017
fe5b5d6
Refactor tests code
kishorvpatil Mar 21, 2017
677edf7
Refactor launcher variables to package private
kishorvpatil Mar 22, 2017
ee3f24a
Fixing compilation errors
kishorvpatil Mar 28, 2017
30b460c
Fixing compilation errors
kishorvpatil Mar 28, 2017
7323200
Addressinng code review comments
kishorvpatil Apr 4, 2017
0cfd4a7
Remove sys.env reference and passing it to SparkApp
kishorvpatil Apr 6, 2017
8609874
Removing changes related to passing sys.env
kishorvpatil Apr 19, 2017
ab50dd8
Merge branch 'master' of git://git.apache.org/spark into SPARK-17443
kishorvpatil Apr 20, 2017
14c6365
Fixing documentation and review comments
kishorvpatil Apr 26, 2017
04e56fc
Adding check to launch as thread
kishorvpatil Apr 27, 2017
7ee465f
Fixing breaking tests
kishorvpatil Apr 28, 2017
3f060b6
Fix documentation
kishorvpatil Apr 28, 2017
f1b49d8
Addressing review comments
kishorvpatil May 4, 2017
2996fb1
Fix minor review comments
kishorvpatil May 9, 2017
a311721
Fixing SparkLauncherSuite test
kishorvpatil May 10, 2017
d906072
Fix SparkLauncherSuite with waitFor method.
kishorvpatil May 23, 2017
81fd297
Fixing SparkLauncherSuite unit test
kishorvpatil Jun 14, 2017
10513ec
Fixing review comments
kishorvpatil Jun 14, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,71 @@

package org.apache.spark.launcher

import java.io.IOException
import java.net.{InetAddress, Socket}

import org.apache.spark.SPARK_VERSION
import org.apache.spark.internal.Logging
import org.apache.spark.launcher.LauncherProtocol._
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.{ShutdownHookManager, ThreadUtils, Utils}

/**
* A class that can be used to talk to a launcher server. Users should extend this class to
* provide implementation for the abstract methods.
*
* See `LauncherServer` for an explanation of how launcher communication works.
*/
private[spark] abstract class LauncherBackend {
private[spark] abstract class LauncherBackend extends Logging {

private var clientThread: Thread = _
private var connection: BackendConnection = _
private var lastState: SparkAppHandle.State = _
private var stopFlag: Boolean = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: type is not necessary. Also, not a big fan of the name of this variable, it should be more descriptive. Like stopOnShutdown.

@volatile private var _isConnected = false

def connect(): Unit = {
val port = sys.env.get(LauncherProtocol.ENV_LAUNCHER_PORT).map(_.toInt)
val secret = sys.env.get(LauncherProtocol.ENV_LAUNCHER_SECRET)
val stopFlag = sys.env.get(LauncherProtocol.ENV_LAUNCHER_STOP_FLAG).map(_.toBoolean)
if (port != None && secret != None) {
val s = new Socket(InetAddress.getLoopbackAddress(), port.get)
connection = new BackendConnection(s)
connection.send(new Hello(secret.get, SPARK_VERSION))
clientThread = LauncherBackend.threadFactory.newThread(connection)
clientThread.start()
_isConnected = true
if (stopFlag != None) {
connect(port.get, secret.get, stopFlag.get)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we just set this.stopFlag here and have one connect method since that is all this one is doing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignore this I didn't see it being called from Client

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of the if you could just have connect(port, secret, stop.getOrElse(false))

} else {
connect(port.get, secret.get)
}
}
}

def connect(port: Int, secret: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method (but not the code!) seems redundant. If you had just connect(port: Int, secret: String, stopFlag: Boolean) then you probably wouldn't need the stopFlag field in the first place.

val s = new Socket(InetAddress.getLoopbackAddress(), port)
connection = new BackendConnection(s)
connection.send(new Hello(secret, SPARK_VERSION))
clientThread = LauncherBackend.threadFactory.newThread(connection)
clientThread.start()
_isConnected = true
if (stopFlag) {
logDebug("Adding shutdown hook") // force eager creation of logger
var _shutdownHookRef = ShutdownHookManager.addShutdownHook(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're not using _shutdownHookRef anywhere, are you? So it can go away. Which would let you better indent this code block, which is a little confusing.

ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
logInfo("Invoking onStopRequest() from shutdown hook")
try {
if (_isConnected && stopFlag) {
onStopRequest()
}
}
catch {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move to previous line

case anotherIOE: IOException =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why anotherIOE? There's no previous exception being handled here.

logError("Error while running LauncherBackend shutdownHook...", anotherIOE)
}
}
}
}

def connect(port: Int, secret: String, stopFlag: Boolean): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You haven't addressed my previous feedback here: #15009 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This addressed now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You haven't addressed it, one way or another. You don't need all these connect() methods. You need only two: one without any parameters, and one with all three. The variant with two parameters is not needed.

this.stopFlag = stopFlag
connect(port, secret)
}

def close(): Unit = {
if (connection != null) {
try {
Expand All @@ -71,6 +104,9 @@ private[spark] abstract class LauncherBackend {
if (connection != null && lastState != state) {
connection.send(new SetState(state))
lastState = state
if (!_isConnected && stopFlag) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need this if its being done in close() now?

fireStopRequest()
}
}
}

Expand Down Expand Up @@ -110,12 +146,14 @@ private[spark] abstract class LauncherBackend {
override def close(): Unit = {
try {
super.close()
if (!_isConnected && stopFlag) {
fireStopRequest()
}
} finally {
onDisconnected()
_isConnected = false
}
}

}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.launcher;

import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: too many blank lines

public abstract class AbstractSparkAppHandle implements SparkAppHandle {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These classes should not be public; only SparkAppHandle is a public interface. Package private should be enough.

private static final Logger LOG = Logger.getLogger(AbstractSparkAppHandle.class.getName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: getName() is unnecessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java.util.logging.Logger#getLogger would complain if argument is not a String.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, forgot this is java.util.logging not slf4j...

protected final String secret;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please add a blank line between static and non-static fields.

protected final LauncherServer server;
protected boolean disposed;
protected List<Listener> listeners;
protected State state;
private LauncherConnection connection;
private String appId;
OutputRedirector redirector;

Copy link
Contributor

@tgravescs tgravescs Sep 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove extra blank newlines

public AbstractSparkAppHandle(LauncherServer server, String secret) {
this.server = server;
this.secret = secret;
this.state = State.UNKNOWN;
}

@Override
public synchronized void addListener(Listener l) {
if (listeners == null) {
listeners = new ArrayList<>();
}
listeners.add(l);
}

@Override
public State getState() {
return state;
}

@Override
public String getAppId() {
return appId;
}

@Override
public void stop() {
CommandBuilderUtils.checkState(connection != null, "Application is still not connected.");
try {
connection.send(new LauncherProtocol.Stop());
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}

@Override
public synchronized void disconnect() {
if (!disposed) {
disposed = true;
if (connection != null) {
try {
connection.close();
} catch (IOException ioe) {
// no-op.
}
}
server.unregister(this);
if (redirector != null) {
redirector.stop();
}
}
}

String getSecret() {
return secret;
}

void setConnection(LauncherConnection connection) {
this.connection = connection;
}

LauncherServer getServer() {
return server;
}

LauncherConnection getConnection() {
return connection;
}

void setState(State s) {
if (!state.isFinal()) {
state = s;
fireEvent(false);
} else {
LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.",
new Object[]{state, s});
}
}

void setAppId(String appId) {
this.appId = appId;
fireEvent(true);
}

private synchronized void fireEvent(boolean isInfoChanged) {
if (listeners != null) {
for (Listener l : listeners) {
if (isInfoChanged) {
l.infoChanged(this);
} else {
l.stateChanged(this);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,71 +27,14 @@
/**
* Handle implementation for monitoring apps started as a child process.
*/
class ChildProcAppHandle implements SparkAppHandle {
class ChildProcAppHandle extends AbstractSparkAppHandle {

private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName());

private final String secret;
private final LauncherServer server;

private Process childProc;
private boolean disposed;
private LauncherConnection connection;
private List<Listener> listeners;
private State state;
private String appId;
private OutputRedirector redirector;

ChildProcAppHandle(String secret, LauncherServer server) {
this.secret = secret;
this.server = server;
this.state = State.UNKNOWN;
}

@Override
public synchronized void addListener(Listener l) {
if (listeners == null) {
listeners = new ArrayList<>();
}
listeners.add(l);
}

@Override
public State getState() {
return state;
}

@Override
public String getAppId() {
return appId;
}

@Override
public void stop() {
CommandBuilderUtils.checkState(connection != null, "Application is still not connected.");
try {
connection.send(new LauncherProtocol.Stop());
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}

@Override
public synchronized void disconnect() {
if (!disposed) {
disposed = true;
if (connection != null) {
try {
connection.close();
} catch (IOException ioe) {
// no-op.
}
}
server.unregister(this);
if (redirector != null) {
redirector.stop();
}
}
super(server, secret);
}

@Override
Expand All @@ -103,67 +46,22 @@ public synchronized void kill() {
try {
childProc.exitValue();
} catch (IllegalThreadStateException e) {
// Child is still alive. Try to use Java 8's "destroyForcibly()" if available,
// fall back to the old API if it's not there.
try {
Method destroy = childProc.getClass().getMethod("destroyForcibly");
destroy.invoke(childProc);
} catch (Exception inner) {
childProc.destroy();
} catch (Exception inner) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will never happen. You're breaking the existing functionality. See my previous comment (https://github.com/apache/spark/pull/15009/files#r84788125).

// no-op
}
} finally {
childProc = null;
}
}
}

String getSecret() {
return secret;
}

void setChildProc(Process childProc, String loggerName) {
this.childProc = childProc;
this.redirector = new OutputRedirector(childProc.getInputStream(), loggerName,
SparkLauncher.REDIRECTOR_FACTORY);
}

void setConnection(LauncherConnection connection) {
this.connection = connection;
}

LauncherServer getServer() {
return server;
}

LauncherConnection getConnection() {
return connection;
}

void setState(State s) {
if (!state.isFinal()) {
state = s;
fireEvent(false);
} else {
LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.",
new Object[] { state, s });
}
}

void setAppId(String appId) {
this.appId = appId;
fireEvent(true);
}

private synchronized void fireEvent(boolean isInfoChanged) {
if (listeners != null) {
for (Listener l : listeners) {
if (isInfoChanged) {
l.infoChanged(this);
} else {
l.stateChanged(this);
}
}
}
}

}
Loading