Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27900][CORE] Add uncaught exception handler to the driver #24796

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,16 @@ private[spark] class SparkSubmit extends Logging {
* running cluster deploy mode or python applications.
*/
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
// set the handler as early as possible
// we need the driver check because spark-submit is used by mesos to launch
// executors (via the SparkLauncher)
val isExecutor = args.childArgs.contains("--executor-id")
Copy link
Contributor Author

@skonto skonto Jun 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no the best but at this point this is what is available (the alternative is to add it at the SparkContext side where SparkEnv provides the info if we are at the driver jvm).

args.deployMode match {
case "client" | null if !isExecutor =>
Thread.setDefaultUncaughtExceptionHandler(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember there was an argument that we should not set the default UncaughtExceptionHandler for driver is Spark doesn't control the whole driver JVM and we should not just help the user decide what should be done in this JVM. They may have their own UncaughtExceptionHandler.

Copy link
Contributor Author

@skonto skonto Jun 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be documentation or something about what is provided (because Spark on K8s is affected for example) but even if the user adds a handler, there will be a deadlock here (could be fixed I guess). In general I dont see people setting one and what happens if the same code is implemented in python (which calls the jvm stuff behind the scenes) you need a default handler in that case I guess, my 2 cents.

Copy link
Member

@zsxwing zsxwing Jun 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was actually supporting this since if a Spark thread crashes, SparkContext and other components probably will not work correctly. Could not find the related PR or JIRA ticket though...

One alternative solution is just setting up UncaughtExceptionHandler for all Spark created threads. Then this will not impact any other user threads.

Copy link
Contributor Author

@skonto skonto Jun 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is currently done only for the fsHistoryProvider (unrelated to spark core):

    initThread.setUncaughtExceptionHandler(errorHandler.getOrElse(
      (_: Thread, e: Throwable) => {
        logError("Error initializing FsHistoryProvider.", e)
        System.exit(1)
      }))

I am not sure how this will help as we will have to exit from there anyway, no?

new SparkUncaughtExceptionHandler(isDriver = true))
}

val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
// Let the main class re-initialize the logging system once it starts.
if (uninitLog) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private[spark] object ShutdownHookManager extends Logging {
*/
val TEMP_DIR_SHUTDOWN_PRIORITY = 25

private lazy val shutdownHooks = {
private[spark] lazy val shutdownHooks = {
val manager = new SparkShutdownHookManager()
manager.install()
manager
Expand Down Expand Up @@ -204,6 +204,11 @@ private [util] class SparkShutdownHookManager {
hooks.synchronized { hooks.remove(ref) }
}

def clear(): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still not so clear why you want to forget all the hooks that were registered -- what's the scenario where they prevent termination, and can we fix the hook? seems like we'd like them to try to complete

Copy link
Contributor Author

@skonto skonto Jun 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scenario is described in the jira. I see a deadlock. The DAG event loop thread dies because it tries to schedule a big number of tasks it gets an OOM and gets unresponsive (never wakes from the interrupt). I can point to the full jstack output if we need to dig further into this.
Then the shutdownhook blocks without the DAG scheduler being able to finish. The scheduler stop logic waits here

In general I am not confident we can predict the status of the jvm in case of an oom and what can go wrong in order to do a safe shutdown, that is why an immediate exit is one good option (maybe not the only one).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a deadlock? I understand just exiting anyway from the driver if an uncaught exception happens, just not so clear why one would remove the hooks. If they execute, good, or are you saying they're the issue? if so, is there a fix for the hook instead? I'm not sure it's better to not execute them.

Copy link
Contributor Author

@skonto skonto Jun 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to fix it but the Event loop thread will not get interrupted so it cannot complete the join (btw its a daemon thread). I will paste the jstack output shortly so you can have a look, that is my view so far.

Copy link
Contributor Author

@skonto skonto Jun 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen here it is: https://gist.github.com/skonto/74181e434a727901d4f3323461c1050b
I commented out the clear call. One other (indepedent) thing I noticed is that the main thread is also stuck here:

ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)

Blocking for ever there might be a problem if something goes wrong.
Now if you check the output:

"Thread-1" #10 prio=5 os_prio=0 tid=0x000055d323902000 nid=0x7c in Object.wait() [0x00007fdccd08a000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1252)
	- locked <0x00000000ebe00e50> (a org.apache.spark.util.EventLoop$$anon$1)
	at java.lang.Thread.join(Thread.java:1326)
	at org.apache.spark.util.EventLoop.stop(EventLoop.scala:81)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2100)

this will never finish and it waits at the join(EventLoop.scala:81) for dag-scheduler-event-loop:

"dag-scheduler-event-loop" #45 daemon prio=5 os_prio=0 tid=0x000055d323a25000 nid=0x48 in Object.wait() [0x00007fdccd6d2000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1252)
	- locked <0x00000000eb4f3b58> (a org.apache.hadoop.util.ShutdownHookManager$1)
	at java.lang.Thread.join(Thread.java:1326)
	at java.lang.ApplicationShutdownHooks.runHooks(ApplicationShutdownHooks.java:107)
	at java.lang.ApplicationShutdownHooks$1.run(ApplicationShutdownHooks.java:46)
	at java.lang.Shutdown.runHooks(Shutdown.java:123)
	at java.lang.Shutdown.sequence(Shutdown.java:167)
	at java.lang.Shutdown.exit(Shutdown.java:212)
	- locked <0x00000000eb3848b8> (a java.lang.Class for java.lang.Shutdown)
	at java.lang.Runtime.exit(Runtime.java:109)
	at java.lang.System.exit(System.java:971)
	at org.apache.spark.util.SparkUncaughtExceptionHandler.sysExit(SparkUncaughtExceptionHandler.scala:35)
	at org.apache.spark.util.SparkUncaughtExceptionHandler.uncaughtException(SparkUncaughtExceptionHandler.scala:53)
	at java.lang.ThreadGroup.uncaughtException(ThreadGroup.java:1057)
	at java.lang.ThreadGroup.uncaughtException(ThreadGroup.java:1052)
	at java.lang.Thread.dispatchUncaughtException(Thread.java:1959)

which waits for the shutodwn hook to finish which is invoked by the oom which was created by itself. So its a deadlock. If you check the log oom comes for that thread which tries to submit 1M tasks ;). dag-scheduler-event-loop -> shutdownHook -> calls join from the other thread and waits for dag-scheduler-event-loop (deadlock).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to fix it but the Event loop thread will not get interrupted so it cannot complete the join (btw its a daemon thread). I will paste the jstack output shortly so you can have a look, that is my view so far.

Do you know why the event loop thread won't get interrupted? I tried to reproduce the dead lock locally using this change zsxwing@b1aecf2 but I could not reproduce the following hang:

"Thread-1" #10 prio=5 os_prio=0 tid=0x000055d323902000 nid=0x7c in Object.wait() [0x00007fdccd08a000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1252)
	- locked <0x00000000ebe00e50> (a org.apache.spark.util.EventLoop$$anon$1)
	at java.lang.Thread.join(Thread.java:1326)
	at org.apache.spark.util.EventLoop.stop(EventLoop.scala:81)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2100)

but saw a different behavior: JVM just exits when reaching in eventThread.join() as I don't see any outputs after eventThread.join(). Here are all outputs I got:

scala> sc.parallelize(1 to 10).count()
[Stage 0:>                                                          (0 + 8) / 8]19/06/05 12:52:00 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[dag-scheduler-event-loop,5,main]
java.lang.OutOfMemoryError: foo
	at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1344)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2172)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2124)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2113)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:50)
stopping Thread[dag-scheduler-event-loop,5,main] in Thread[Thread-1,5,main]
waiting for Thread[dag-scheduler-event-loop,5,main] in Thread[Thread-1,5,main]

My JDK version is 1.8.0_171. Maybe Thread.join in a shutdown hook behaviors differently in different JDK versions.

Copy link
Contributor Author

@skonto skonto Jun 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing interesting... I use the jvm in the alpine docker image used for Spark on K8s openjdk:8-alpine

sh-4.4$ java -version
openjdk version "1.8.0_201"
OpenJDK Runtime Environment (IcedTea 3.11.0) (Alpine 8.201.08-r1)
OpenJDK 64-Bit Server VM (build 25.212-b04, mixed mode

which is the latest...
Btw from what I read there is no guarantee shutdown hooks will run till completed...
https://dzone.com/articles/know-jvm-series-2-shutdown, that is why I thought just exiting is the most reliable option and that is why I cleared the hooks, although not satisfying in every case.
"First thing to keep in mind is that it is not guaranteed that shutdown hooks will always run. If the JVM crashes due to some internal error, then it might crash down without having a chance to execute a single instruction."
In addition, even if this will not happen often, shutdown hooks need to finish fast anyway so you stand a chance to finish (especially with jvm errors) and waiting in a join when you dont know the state of things is dangerous anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing its possible your change is not triggering the same thing because throwing an OOM from within java code is not the same as an OOM from within the JVM itself (IIRC, when the user throws OOM themselves none of the jvm XX options related to OOMs get triggered).

hooks.synchronized {
hooks.clear()
}
}
}

private class SparkShutdownHook(private val priority: Int, hook: () => Unit)
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/util/SparkExitCode.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ private[spark] object SparkExitCode {
OutOfMemoryError. */
val OOM = 52

val VM_ERROR = 53

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,16 @@ import org.apache.spark.internal.Logging
*
* @param exitOnUncaughtException Whether to exit the process on UncaughtException.
*/
private[spark] class SparkUncaughtExceptionHandler(val exitOnUncaughtException: Boolean = true)
private[spark] class SparkUncaughtExceptionHandler(
val exitOnUncaughtException: Boolean = true,
isDriver: Boolean = false)
extends Thread.UncaughtExceptionHandler with Logging {

private def sysExit(error: Int, clearHooks: Boolean = false): Unit = {
if (clearHooks) ShutdownHookManager.shutdownHooks.clear()
System.exit(error)
}

override def uncaughtException(thread: Thread, exception: Throwable) {
try {
// Make it explicit that uncaught exceptions are thrown when container is shutting down.
Expand All @@ -41,13 +48,19 @@ private[spark] class SparkUncaughtExceptionHandler(val exitOnUncaughtException:
if (!ShutdownHookManager.inShutdown()) {
exception match {
case _: OutOfMemoryError =>
System.exit(SparkExitCode.OOM)
// we need to exit immediately no space for graceful shutdown here
// we cannot assume the jvm is healthy
sysExit(SparkExitCode.OOM, isDriver)
case e: SparkFatalException if e.throwable.isInstanceOf[OutOfMemoryError] =>
// SPARK-24294: This is defensive code, in case that SparkFatalException is
// misused and uncaught.
System.exit(SparkExitCode.OOM)
sysExit(SparkExitCode.OOM, isDriver)
case _: VirtualMachineError if exitOnUncaughtException =>
// we need to exit immediately no space for graceful shutdown here
// we cannot assume the jvm is healthy
sysExit(SparkExitCode.VM_ERROR, isDriver)
case _ if exitOnUncaughtException =>
System.exit(SparkExitCode.UNCAUGHT_EXCEPTION)
sysExit(SparkExitCode.UNCAUGHT_EXCEPTION)
}
}
} catch {
Expand Down