-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-17443][SPARK-11035] Stop Spark Application if launcher goes down and use reflection #15009
Conversation
Jenkins, test this please |
} | ||
|
||
def connect(port: Int, secret: String): Unit = { | ||
if (port != None && secret != None) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doing the same check here as line 46 above, don't need both.
Test build #65103 has finished for PR 15009 at commit
|
if (port != None && secret != None) { | ||
val s = new Socket(InetAddress.getLoopbackAddress(), port.get) | ||
if(stopFlag != None) { | ||
connect(port.get, secret.get, stopFlag.get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't we just set this.stopFlag here and have one connect method since that is all this one is doing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ignore this I didn't see it being called from Client
@kishorvpatil Can you please describe in more detail what manual testing you did? |
private String appId; | ||
|
||
OutputRedirector redirector; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove extra blank newlines
Jenkins, test this please |
made a few comments but haven't made it all the way through yet, hopefully later this afternoon |
Test build #65108 has finished for PR 15009 at commit
|
Jenkins, this is okay to test |
Jenkins, test this please |
if (!disposed) { | ||
disconnect(); | ||
} | ||
if (childThread!= null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
space before !=
Test build #65150 has finished for PR 15009 at commit
|
childThread = null; | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove extra new line
Jenkins, test this please |
Test build #65166 has finished for PR 15009 at commit
|
@@ -71,6 +104,9 @@ private[spark] abstract class LauncherBackend { | |||
if (connection != null && lastState != state) { | |||
connection.send(new SetState(state)) | |||
lastState = state | |||
if (!_isConnected && stopFlag) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we still need this if its being done in close() now?
@vanzin I modified the test with sleep for SparkApp to run. |
.addAppArgs("thread"); | ||
final SparkAppHandle app = launcher.startApplication(); | ||
sleep(3000); | ||
assertEquals(false, app.getState().isFinal()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You still haven't fixed this. This is what I wrote the very first time I commented on this:
Hmm... you need to wait for the app to finish, or kill it eventually. And check that it actually ran.
You are not waiting for the app to finish. You're checking that app.getState().isFinal()
is false
here, so basically this test waits 3 seconds and still expects the app to be running after that.
You want exactly the opposite. You want the app to finish at some point, and not just finish, but finish successfully. That means checking for a specific status.
Test build #77263 has finished for PR 15009 at commit
|
@vanzin I added |
.addAppArgs("thread"); | ||
final SparkAppHandle app = launcher.startApplication(); | ||
sleep(3000); | ||
AbstractSparkAppHandle handle = (AbstractSparkAppHandle)app; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need this. SparkAppHandle
is a AbstractSparkAppHandle
, isn't it? You should also make waitFor()
package private instead of protected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
waitFor
is a package private now. This method is added for unit test in AbstractSparkAppHandle
not part of the interface SparkAppHandle
.
final SparkAppHandle app = launcher.startApplication(); | ||
sleep(3000); | ||
AbstractSparkAppHandle handle = (AbstractSparkAppHandle)app; | ||
handle.waitFor(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How are you testing that the application actually succeeded instead of, e.g., throwing an exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this unit test the main application is Simple application. There is no spark application running in background to track status, so no way to use handle.getState
. All I am checking is for launched thread/process is complete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like there is a bug somewhere; if the app finishes without reporting state (either in child process or thread mode), the handle should change its state to some form of error (e.g. LOST
). Anyway that's unrelated to your changes, but please file a separate bug.
You can still check that the state is not FAILED
, right? Because if the child application throws an exception in thread mode, the handle should at least be updated to the failed state. (That would be a nice candidate for a new unit test, too.)
public static class SparkLauncherTestApp { | ||
|
||
public static void main(String[] args) throws Exception { | ||
assertEquals(1, args.length); | ||
assertEquals("proc", args[0]); | ||
assertEquals("bar", System.getProperty("foo")); | ||
if("proc".equalsIgnoreCase(args[0])) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing space after if
.
|
||
protected void waitFor() throws InterruptedException { | ||
if(this.childThread.isAlive()) { | ||
this.childThread.wait(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why wait
and not join
? Who's doing a notify
on the thread to wake up this call?
} | ||
|
||
protected void waitFor() throws InterruptedException { | ||
if(this.childThread.isAlive()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing space after if
.
Test build #78058 has finished for PR 15009 at commit
|
retest this please |
Test build #78060 has finished for PR 15009 at commit
|
Test build #78064 has finished for PR 15009 at commit
|
@kishorvpatil are you planning to address the last bit of feedback remaining here? It shouldn't be that hard to make that test better. |
@kishorvpatil this will be quite useful for us! To avoid the 3s cost of spinning up a new jvm just for yarn-cluster |
I kinda gave up waiting for feedback to be addressed here. I'm fixing some things in the launcher library and will work on SPARK-11035 next, replacing part of this PR. |
That would be incredible. Launching a new jvm and loading all of hadoop takes about 4 seconds extra each time, versus reusing the launcher jvm, which is really significant for us since we launch a lot of jobs and users have to wait on this. |
…e External Shuffle Service
…e External Shuffle Service
What changes were proposed in this pull request?
SparkLauncher
providesstopIfInterrupted
method to allow ensuringSparkApplication
stops if tracking process dies.SparkLauncher
providesstartApplicationInProcess
method to start application using reflection in another Thread instead of launching childProcess and relying onspark-submit
binary.(Please fill in changes proposed in this fix)
How was this patch tested?
I ran following Manual tests:
startApplication
with request tostopIfInterrupted
, then send interrupt signal to the process.Result: The
childprocess
kills Spark Application and terminates.startApplicationInProcess
with request tostopIfInterrupted
, then send interrupt signal to the process.Result: The shutdownHook in LauncherBackend kills Spark Application and before termination.
startApplication
withoutstopIfInterrupted
, then send interrupt signal to the process.Result: The
childprocess
terminates without killing the Spark Application.startApplicationInProcess
withoutstopIfInterrupted
, then send interrupt signal to the process.Result: The process terminates without killing Spark Application.