Skip to content

Commit

Permalink
[SPARK-21146][CORE] Master/Worker should handle and shutdown when any…
Browse files Browse the repository at this point in the history
… thread gets UncaughtException

## What changes were proposed in this pull request?

Adding the default UncaughtExceptionHandler to the Worker.

## How was this patch tested?

I verified it manually, when any of the worker thread gets uncaught exceptions then the default UncaughtExceptionHandler will handle those exceptions.

Author: Devaraj K <devaraj@apache.org>

Closes #18357 from devaraj-kavali/SPARK-21146.
  • Loading branch information
Devaraj K authored and zsxwing committed Jul 12, 2017
1 parent 24367f2 commit e16e8c7
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc._
import org.apache.spark.serializer.{JavaSerializer, Serializer}
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils}

private[deploy] class Master(
override val rpcEnv: RpcEnv,
Expand Down Expand Up @@ -1045,6 +1045,8 @@ private[deploy] object Master extends Logging {
val ENDPOINT_NAME = "Master"

def main(argStrings: Array[String]) {
Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(
exitOnUncaughtException = false))
Utils.initDaemon(log)
val conf = new SparkConf
val args = new MasterArguments(argStrings, conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc._
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils}

private[deploy] class Worker(
override val rpcEnv: RpcEnv,
Expand Down Expand Up @@ -737,6 +737,8 @@ private[deploy] object Worker extends Logging {
val ENDPOINT_NAME = "Worker"

def main(argStrings: Array[String]) {
Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(
exitOnUncaughtException = false))
Utils.initDaemon(log)
val conf = new SparkConf
val args = new WorkerArguments(argStrings, conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private[spark] class Executor(
env: SparkEnv,
userClassPath: Seq[URL] = Nil,
isLocal: Boolean = false,
uncaughtExceptionHandler: UncaughtExceptionHandler = SparkUncaughtExceptionHandler)
uncaughtExceptionHandler: UncaughtExceptionHandler = new SparkUncaughtExceptionHandler)
extends Logging {

logInfo(s"Starting executor ID $executorId on host $executorHostname")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ package org.apache.spark.util
import org.apache.spark.internal.Logging

/**
* The default uncaught exception handler for Executors terminates the whole process, to avoid
* getting into a bad state indefinitely. Since Executors are relatively lightweight, it's better
* to fail fast when things go wrong.
* The default uncaught exception handler for Spark daemons. It terminates the whole process for
* any Errors, and also terminates the process for Exceptions when the exitOnException flag is true.
*
* @param exitOnUncaughtException Whether to exit the process on UncaughtException.
*/
private[spark] object SparkUncaughtExceptionHandler
private[spark] class SparkUncaughtExceptionHandler(val exitOnUncaughtException: Boolean = true)
extends Thread.UncaughtExceptionHandler with Logging {

override def uncaughtException(thread: Thread, exception: Throwable) {
Expand All @@ -40,7 +41,7 @@ private[spark] object SparkUncaughtExceptionHandler
if (!ShutdownHookManager.inShutdown()) {
if (exception.isInstanceOf[OutOfMemoryError]) {
System.exit(SparkExitCode.OOM)
} else {
} else if (exitOnUncaughtException) {
System.exit(SparkExitCode.UNCAUGHT_EXCEPTION)
}
}
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ private[spark] object CallSite {
private[spark] object Utils extends Logging {
val random = new Random()

private val sparkUncaughtExceptionHandler = new SparkUncaughtExceptionHandler

/**
* Define a default value for driver memory here since this value is referenced across the code
* base and nearly all files already use Utils.scala
Expand Down Expand Up @@ -1274,7 +1276,7 @@ private[spark] object Utils extends Logging {
block
} catch {
case e: ControlThrowable => throw e
case t: Throwable => SparkUncaughtExceptionHandler.uncaughtException(t)
case t: Throwable => sparkUncaughtExceptionHandler.uncaughtException(t)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private[mesos] object MesosClusterDispatcher
with CommandLineUtils {

override def main(args: Array[String]) {
Thread.setDefaultUncaughtExceptionHandler(SparkUncaughtExceptionHandler)
Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler)
Utils.initDaemon(log)
val conf = new SparkConf
val dispatcherArgs = new MesosClusterDispatcherArguments(args, conf)
Expand Down

0 comments on commit e16e8c7

Please sign in to comment.