Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21146] [CORE] Master/Worker should handle and shutdown when any thread gets UncaughtException #18357

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc._
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils}

private[deploy] class Worker(
override val rpcEnv: RpcEnv,
Expand Down Expand Up @@ -737,6 +737,7 @@ private[deploy] object Worker extends Logging {
val ENDPOINT_NAME = "Worker"

def main(argStrings: Array[String]) {
Thread.setDefaultUncaughtExceptionHandler(SparkUncaughtExceptionHandler(false))
Utils.initDaemon(log)
val conf = new SparkConf
val args = new WorkerArguments(argStrings, conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,34 @@ import org.apache.spark.internal.Logging
*/
private[spark] object SparkUncaughtExceptionHandler
extends Thread.UncaughtExceptionHandler with Logging {
private[this] var exitOnException = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using a static variable, I prefer to change SparkUncaughtExceptionHandler to a class.


override def uncaughtException(thread: Thread, exception: Throwable) {
try {
// Make it explicit that uncaught exceptions are thrown when container is shutting down.
// It will help users when they analyze the executor logs
val inShutdownMsg = if (ShutdownHookManager.inShutdown()) "[Container in shutdown] " else ""
val errMsg = "Uncaught exception in thread "
logError(inShutdownMsg + errMsg + thread, exception)
def apply(exitOnException: Boolean): Thread.UncaughtExceptionHandler = {
this.exitOnException = exitOnException
this
}

// We may have been called from a shutdown hook. If so, we must not call System.exit().
// (If we do, we will deadlock.)
if (!ShutdownHookManager.inShutdown()) {
override def uncaughtException(thread: Thread, exception: Throwable) {
// Make it explicit that uncaught exceptions are thrown when process is shutting down.
// It will help users when they analyze the executor logs
val errMsg = "Uncaught exception in thread " + thread
if (ShutdownHookManager.inShutdown()) {
logError("[Process in shutdown] " + errMsg, exception)
} else if (exception.isInstanceOf[Error] ||
(!exception.isInstanceOf[Error] && exitOnException)) {
try {
logError(errMsg + ". Shutting down now..", exception)
if (exception.isInstanceOf[OutOfMemoryError]) {
System.exit(SparkExitCode.OOM)
} else {
System.exit(SparkExitCode.UNCAUGHT_EXCEPTION)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current changes are too much. Could you rename exitOnException to exitOnUncaughtException and just change this line to

if (exitOnUncaughtException) {
  System.exit(SparkExitCode.UNCAUGHT_EXCEPTION)
}

}
} catch {
case oom: OutOfMemoryError => Runtime.getRuntime.halt(SparkExitCode.OOM)
case t: Throwable => Runtime.getRuntime.halt(SparkExitCode.UNCAUGHT_EXCEPTION_TWICE)
}
} catch {
case oom: OutOfMemoryError => Runtime.getRuntime.halt(SparkExitCode.OOM)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you keep these codes? It unlikely happens but since the codes are there, it's better to not change it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing, this code is still there but I moved the try&catch to the block where we invoke System.exit. Do you mean moving the whole code in uncaughtException() to try block and having the catch block?

+      } catch {
+        case oom: OutOfMemoryError => Runtime.getRuntime.halt(SparkExitCode.OOM)
+        case t: Throwable => Runtime.getRuntime.halt(SparkExitCode.UNCAUGHT_EXCEPTION_TWICE)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean moving the whole code in uncaughtException() to try block and having the catch block?

Yes.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing Thanks for the clarification.

case t: Throwable => Runtime.getRuntime.halt(SparkExitCode.UNCAUGHT_EXCEPTION_TWICE)
} else {
logError(errMsg, exception)
}
}

Expand Down