-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-21146] [CORE] Master/Worker should handle and shutdown when any thread gets UncaughtException #18357
Conversation
gets UncaughtException
Could you elaborate on when we need this improvement? In fact I don't see much benefit of having this at first glance. |
@jiangxb1987 Thanks for looking at this. Any one of the threads in Worker gets an exception which is unhandled then the thread gets terminated and process(Worker) keeps running, and the Worker doesn't perform the terminated thread functionality. One instance I mentioned in the JIRA description, where dispatcher-event thread got the exception and terminated, worker keeps running without performing the terminated thread functionality. I think we should handle this at the process level for all the threads, if any thread gets unhandled exception then the process(Worker) should exit instead of performing partial functionality. This PR code change would set the default UncaughtExceptionHandler as SparkUncaughtExceptionHandler and for any unhandled exception in any thread of the process, SparkUncaughtExceptionHandler handles the exception and exit the process with the appropriate exit code. |
AFAIK SparkUncaughtExceptionHandler is designed to make Executors fail fast, because Executors are relatively lightweight, but I'm not sure we can afford to let Workers, even the Master fail fast too. |
according to the JIRA, if the worker meets some unrecoverable issues and the state becomes DEAD, it seems make sense to terminate this worker as it's not functional. But can you double check that this change won't terminate a still-functional worker? |
The change doesn't impact/harm the fully functional Worker, SparkUncaughtExceptionHandler handles only when any thread gets terminated due to the exception thrown from it without handling. |
ok to test |
|
Test build #78404 has finished for PR 18357 at commit
|
@zsxwing Thanks for looking into this, I see SparkUncaughtExceptionHandler exits the process for all the exceptions/errors. Can we modify SparkUncaughtExceptionHandler in such a way that it kills the process for all Errors(always) and kills the process for Exceptions only when the flag is true, this flag can be enabled for Executors and disabled for other daemons(Master, Worker and HistoryServer)? |
@devaraj-kavali SparkUncaughtExceptionHandler is a singleton so you cannot get the configuration from SparkConf. |
can we change SparkUncaughtExceptionHandler to class and create instance in each daemons main() with the constructor flag whether to kill the process for Exceptions or not? Something like below, For Executor :Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(true)) // Here true denotes to kill the process for Exceptions For Other Daemons:Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(false)) // false denotes not to kill the process for Exceptions |
kill/not-to-kill the process for the unhandled exceptions with the default value as true, behaviour remains same for unhandled Error instances.
Test build #78838 has finished for PR 18357 at commit
|
retest this please. |
Test build #78912 has finished for PR 18357 at commit
|
@@ -26,27 +26,34 @@ import org.apache.spark.internal.Logging | |||
*/ | |||
private[spark] object SparkUncaughtExceptionHandler | |||
extends Thread.UncaughtExceptionHandler with Logging { | |||
private[this] var exitOnException = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of using a static variable, I prefer to change SparkUncaughtExceptionHandler
to a class.
} | ||
} catch { | ||
case oom: OutOfMemoryError => Runtime.getRuntime.halt(SparkExitCode.OOM) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you keep these codes? It unlikely happens but since the codes are there, it's better to not change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zsxwing, this code is still there but I moved the try&catch to the block where we invoke System.exit. Do you mean moving the whole code in uncaughtException() to try block and having the catch block?
+ } catch {
+ case oom: OutOfMemoryError => Runtime.getRuntime.halt(SparkExitCode.OOM)
+ case t: Throwable => Runtime.getRuntime.halt(SparkExitCode.UNCAUGHT_EXCEPTION_TWICE)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean moving the whole code in uncaughtException() to try block and having the catch block?
Yes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zsxwing Thanks for the clarification.
Test build #78997 has finished for PR 18357 at commit
|
logError("[Process in shutdown] " + errMsg, exception) | ||
} else if (exception.isInstanceOf[Error] || | ||
(!exception.isInstanceOf[Error] && exitOnException)) { | ||
logError(errMsg + ". Shutting down now..", exception) | ||
if (exception.isInstanceOf[OutOfMemoryError]) { | ||
System.exit(SparkExitCode.OOM) | ||
} else { | ||
System.exit(SparkExitCode.UNCAUGHT_EXCEPTION) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current changes are too much. Could you rename exitOnException
to exitOnUncaughtException
and just change this line to
if (exitOnUncaughtException) {
System.exit(SparkExitCode.UNCAUGHT_EXCEPTION)
}
@@ -737,6 +737,7 @@ private[deploy] object Worker extends Logging { | |||
val ENDPOINT_NAME = "Worker" | |||
|
|||
def main(argStrings: Array[String]) { | |||
Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(false)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: false
=> exitOnException = false
*/ | ||
private[spark] object SparkUncaughtExceptionHandler | ||
private[spark] class SparkUncaughtExceptionHandler(val exitOnException: Boolean = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please also document exitOnException
using
@param exitOnException ...
@@ -1037,6 +1037,7 @@ private[deploy] object Master extends Logging { | |||
val ENDPOINT_NAME = "Master" | |||
|
|||
def main(argStrings: Array[String]) { | |||
Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(false)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: false
=> exitOnException = false
LGTM. Pending tests. Thanks! |
Test build #79298 has finished for PR 18357 at commit
|
@zsxwing Tests have passed, can you check this? Thanks |
@devaraj-kavali Sorray. I forgot this PR. I will trigger a new run as master has been updated a lot. Will set a reminder for me to merge this PR :) |
retest this please |
Test build #79537 has finished for PR 18357 at commit
|
retest this please |
Test build #79542 has finished for PR 18357 at commit
|
Thanks! Merging to master. |
What changes were proposed in this pull request?
Adding the default UncaughtExceptionHandler to the Worker.
How was this patch tested?
I verified it manually, when any of the worker thread gets uncaught exceptions then the default UncaughtExceptionHandler will handle those exceptions.