From affcb04b3e20a9b151668c5c82408b1dac6d6d72 Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Tue, 4 Jun 2019 23:31:58 +0300 Subject: [PATCH] add uncaught exception handler to the driver --- .../org/apache/spark/deploy/SparkSubmit.scala | 10 +++++++++ .../spark/util/ShutdownHookManager.scala | 7 ++++++- .../org/apache/spark/util/SparkExitCode.scala | 2 ++ .../util/SparkUncaughtExceptionHandler.scala | 21 +++++++++++++++---- 4 files changed, 35 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 926e2dfd34d36..e372212001585 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -861,6 +861,16 @@ private[spark] class SparkSubmit extends Logging { * running cluster deploy mode or python applications. */ private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = { + // set the handler as early as possible + // we need the driver check because spark-submit is used by mesos to launch + // executors (via the SparkLauncher) + val isExecutor = args.childArgs.contains("--executor-id") + args.deployMode match { + case "client" | null if !isExecutor => + Thread.setDefaultUncaughtExceptionHandler( + new SparkUncaughtExceptionHandler(isDriver = true)) + } + val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args) // Let the main class re-initialize the logging system once it starts. if (uninitLog) { diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala index b702838fa257f..a6c803f89a190 100644 --- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala +++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala @@ -45,7 +45,7 @@ private[spark] object ShutdownHookManager extends Logging { */ val TEMP_DIR_SHUTDOWN_PRIORITY = 25 - private lazy val shutdownHooks = { + private[spark] lazy val shutdownHooks = { val manager = new SparkShutdownHookManager() manager.install() manager @@ -204,6 +204,11 @@ private [util] class SparkShutdownHookManager { hooks.synchronized { hooks.remove(ref) } } + def clear(): Unit = { + hooks.synchronized { + hooks.clear() + } + } } private class SparkShutdownHook(private val priority: Int, hook: () => Unit) diff --git a/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala b/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala index c93b1cca9f564..c178338a2df4e 100644 --- a/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala +++ b/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala @@ -29,4 +29,6 @@ private[spark] object SparkExitCode { OutOfMemoryError. */ val OOM = 52 + val VM_ERROR = 53 + } diff --git a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala index 1b34fbde38cd6..a282ef7508aab 100644 --- a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala +++ b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala @@ -25,9 +25,16 @@ import org.apache.spark.internal.Logging * * @param exitOnUncaughtException Whether to exit the process on UncaughtException. */ -private[spark] class SparkUncaughtExceptionHandler(val exitOnUncaughtException: Boolean = true) +private[spark] class SparkUncaughtExceptionHandler( + val exitOnUncaughtException: Boolean = true, + isDriver: Boolean = false) extends Thread.UncaughtExceptionHandler with Logging { + private def sysExit(error: Int, clearHooks: Boolean = false): Unit = { + if (clearHooks) ShutdownHookManager.shutdownHooks.clear() + System.exit(error) + } + override def uncaughtException(thread: Thread, exception: Throwable) { try { // Make it explicit that uncaught exceptions are thrown when container is shutting down. @@ -41,13 +48,19 @@ private[spark] class SparkUncaughtExceptionHandler(val exitOnUncaughtException: if (!ShutdownHookManager.inShutdown()) { exception match { case _: OutOfMemoryError => - System.exit(SparkExitCode.OOM) + // we need to exit immediately no space for graceful shutdown here + // we cannot assume the jvm is healthy + sysExit(SparkExitCode.OOM, isDriver) case e: SparkFatalException if e.throwable.isInstanceOf[OutOfMemoryError] => // SPARK-24294: This is defensive code, in case that SparkFatalException is // misused and uncaught. - System.exit(SparkExitCode.OOM) + sysExit(SparkExitCode.OOM, isDriver) + case _: VirtualMachineError if exitOnUncaughtException => + // we need to exit immediately no space for graceful shutdown here + // we cannot assume the jvm is healthy + sysExit(SparkExitCode.VM_ERROR, isDriver) case _ if exitOnUncaughtException => - System.exit(SparkExitCode.UNCAUGHT_EXCEPTION) + sysExit(SparkExitCode.UNCAUGHT_EXCEPTION) } } } catch {