Skip to content

Commit

Permalink
add uncaught exception handler to the driver
Browse files Browse the repository at this point in the history
  • Loading branch information
Stavros Kontopoulos committed Jun 4, 2019
1 parent 6748b48 commit affcb04
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 5 deletions.
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,16 @@ private[spark] class SparkSubmit extends Logging {
* running cluster deploy mode or python applications.
*/
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
// set the handler as early as possible
// we need the driver check because spark-submit is used by mesos to launch
// executors (via the SparkLauncher)
val isExecutor = args.childArgs.contains("--executor-id")
args.deployMode match {
case "client" | null if !isExecutor =>
Thread.setDefaultUncaughtExceptionHandler(
new SparkUncaughtExceptionHandler(isDriver = true))
}

val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
// Let the main class re-initialize the logging system once it starts.
if (uninitLog) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private[spark] object ShutdownHookManager extends Logging {
*/
val TEMP_DIR_SHUTDOWN_PRIORITY = 25

private lazy val shutdownHooks = {
private[spark] lazy val shutdownHooks = {
val manager = new SparkShutdownHookManager()
manager.install()
manager
Expand Down Expand Up @@ -204,6 +204,11 @@ private [util] class SparkShutdownHookManager {
hooks.synchronized { hooks.remove(ref) }
}

def clear(): Unit = {
hooks.synchronized {
hooks.clear()
}
}
}

private class SparkShutdownHook(private val priority: Int, hook: () => Unit)
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/util/SparkExitCode.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ private[spark] object SparkExitCode {
OutOfMemoryError. */
val OOM = 52

val VM_ERROR = 53

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,16 @@ import org.apache.spark.internal.Logging
*
* @param exitOnUncaughtException Whether to exit the process on UncaughtException.
*/
private[spark] class SparkUncaughtExceptionHandler(val exitOnUncaughtException: Boolean = true)
private[spark] class SparkUncaughtExceptionHandler(
val exitOnUncaughtException: Boolean = true,
isDriver: Boolean = false)
extends Thread.UncaughtExceptionHandler with Logging {

private def sysExit(error: Int, clearHooks: Boolean = false): Unit = {
if (clearHooks) ShutdownHookManager.shutdownHooks.clear()
System.exit(error)
}

override def uncaughtException(thread: Thread, exception: Throwable) {
try {
// Make it explicit that uncaught exceptions are thrown when container is shutting down.
Expand All @@ -41,13 +48,19 @@ private[spark] class SparkUncaughtExceptionHandler(val exitOnUncaughtException:
if (!ShutdownHookManager.inShutdown()) {
exception match {
case _: OutOfMemoryError =>
System.exit(SparkExitCode.OOM)
// we need to exit immediately no space for graceful shutdown here
// we cannot assume the jvm is healthy
sysExit(SparkExitCode.OOM, isDriver)
case e: SparkFatalException if e.throwable.isInstanceOf[OutOfMemoryError] =>
// SPARK-24294: This is defensive code, in case that SparkFatalException is
// misused and uncaught.
System.exit(SparkExitCode.OOM)
sysExit(SparkExitCode.OOM, isDriver)
case _: VirtualMachineError if exitOnUncaughtException =>
// we need to exit immediately no space for graceful shutdown here
// we cannot assume the jvm is healthy
sysExit(SparkExitCode.VM_ERROR, isDriver)
case _ if exitOnUncaughtException =>
System.exit(SparkExitCode.UNCAUGHT_EXCEPTION)
sysExit(SparkExitCode.UNCAUGHT_EXCEPTION)
}
}
} catch {
Expand Down

0 comments on commit affcb04

Please sign in to comment.