Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27900][CORE] Add uncaught exception handler to the driver #24796

Closed
wants to merge 1 commit into from

Conversation

skonto
Copy link
Contributor

@skonto skonto commented Jun 4, 2019

What changes were proposed in this pull request?

In case of jvm errors the driver will not exit properly as there is no UnCaughtException handler. This causes issues when Spark is run in a container, as jvm will not exit due to still running threads, errors are not propagated to K8s runtime and pods will run forever. This also makes impossible for the Spark Operator to report back a failed job.
As described in the related jira jvm errors may cause deadlocks and we cannot assume a healthy jvm
to do a proper shutdown, so we avoid the registred shutdown hooks, it is the equivalent of setting -XX:+ExitOnOutOfMemoryError. For example the DAG event loop thread is a daemon thread and in the scenario described in the jira becomes unresponsive while the main thread also is stuck in the runJob() method waiting forever to make a job submission. However, this PR does not change the logic for the handler for the master, workers in standalone mode and the Spark executors. It only adds a special behavior for the driver where we exit immediately.

How was this patch tested?

Manually by running a Spark Job.
This will make the job example in the jira fail and the container will exit with:

State:          Terminated
  Reason:       Error
  Exit Code:    52

@skonto
Copy link
Contributor Author

skonto commented Jun 4, 2019

@srowen @vanzin @squito @erikerlandson pls review.

// set the handler as early as possible
// we need the driver check because spark-submit is used by mesos to launch
// executors (via the SparkLauncher)
val isExecutor = args.childArgs.contains("--executor-id")
Copy link
Contributor Author

@skonto skonto Jun 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no the best but at this point this is what is available (the alternative is to add it at the SparkContext side where SparkEnv provides the info if we are at the driver jvm).

case _: VirtualMachineError if exitOnUncaughtException =>
// we need to exit immediately no space for graceful shutdown here
// we cannot assume the jvm is healthy
sysExit(SparkExitCode.OOM, isDriver)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like the wrong code; maybe a new one is needed, not sure

Copy link
Contributor Author

@skonto skonto Jun 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I probably need to figure out some other generic one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed it.

@@ -204,6 +204,11 @@ private [util] class SparkShutdownHookManager {
hooks.synchronized { hooks.remove(ref) }
}

def clear(): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still not so clear why you want to forget all the hooks that were registered -- what's the scenario where they prevent termination, and can we fix the hook? seems like we'd like them to try to complete

Copy link
Contributor Author

@skonto skonto Jun 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scenario is described in the jira. I see a deadlock. The DAG event loop thread dies because it tries to schedule a big number of tasks it gets an OOM and gets unresponsive (never wakes from the interrupt). I can point to the full jstack output if we need to dig further into this.
Then the shutdownhook blocks without the DAG scheduler being able to finish. The scheduler stop logic waits here

In general I am not confident we can predict the status of the jvm in case of an oom and what can go wrong in order to do a safe shutdown, that is why an immediate exit is one good option (maybe not the only one).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a deadlock? I understand just exiting anyway from the driver if an uncaught exception happens, just not so clear why one would remove the hooks. If they execute, good, or are you saying they're the issue? if so, is there a fix for the hook instead? I'm not sure it's better to not execute them.

Copy link
Contributor Author

@skonto skonto Jun 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to fix it but the Event loop thread will not get interrupted so it cannot complete the join (btw its a daemon thread). I will paste the jstack output shortly so you can have a look, that is my view so far.

Copy link
Contributor Author

@skonto skonto Jun 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen here it is: https://gist.github.com/skonto/74181e434a727901d4f3323461c1050b
I commented out the clear call. One other (indepedent) thing I noticed is that the main thread is also stuck here:

ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)

Blocking for ever there might be a problem if something goes wrong.
Now if you check the output:

"Thread-1" #10 prio=5 os_prio=0 tid=0x000055d323902000 nid=0x7c in Object.wait() [0x00007fdccd08a000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1252)
	- locked <0x00000000ebe00e50> (a org.apache.spark.util.EventLoop$$anon$1)
	at java.lang.Thread.join(Thread.java:1326)
	at org.apache.spark.util.EventLoop.stop(EventLoop.scala:81)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2100)

this will never finish and it waits at the join(EventLoop.scala:81) for dag-scheduler-event-loop:

"dag-scheduler-event-loop" #45 daemon prio=5 os_prio=0 tid=0x000055d323a25000 nid=0x48 in Object.wait() [0x00007fdccd6d2000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1252)
	- locked <0x00000000eb4f3b58> (a org.apache.hadoop.util.ShutdownHookManager$1)
	at java.lang.Thread.join(Thread.java:1326)
	at java.lang.ApplicationShutdownHooks.runHooks(ApplicationShutdownHooks.java:107)
	at java.lang.ApplicationShutdownHooks$1.run(ApplicationShutdownHooks.java:46)
	at java.lang.Shutdown.runHooks(Shutdown.java:123)
	at java.lang.Shutdown.sequence(Shutdown.java:167)
	at java.lang.Shutdown.exit(Shutdown.java:212)
	- locked <0x00000000eb3848b8> (a java.lang.Class for java.lang.Shutdown)
	at java.lang.Runtime.exit(Runtime.java:109)
	at java.lang.System.exit(System.java:971)
	at org.apache.spark.util.SparkUncaughtExceptionHandler.sysExit(SparkUncaughtExceptionHandler.scala:35)
	at org.apache.spark.util.SparkUncaughtExceptionHandler.uncaughtException(SparkUncaughtExceptionHandler.scala:53)
	at java.lang.ThreadGroup.uncaughtException(ThreadGroup.java:1057)
	at java.lang.ThreadGroup.uncaughtException(ThreadGroup.java:1052)
	at java.lang.Thread.dispatchUncaughtException(Thread.java:1959)

which waits for the shutodwn hook to finish which is invoked by the oom which was created by itself. So its a deadlock. If you check the log oom comes for that thread which tries to submit 1M tasks ;). dag-scheduler-event-loop -> shutdownHook -> calls join from the other thread and waits for dag-scheduler-event-loop (deadlock).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to fix it but the Event loop thread will not get interrupted so it cannot complete the join (btw its a daemon thread). I will paste the jstack output shortly so you can have a look, that is my view so far.

Do you know why the event loop thread won't get interrupted? I tried to reproduce the dead lock locally using this change zsxwing@b1aecf2 but I could not reproduce the following hang:

"Thread-1" #10 prio=5 os_prio=0 tid=0x000055d323902000 nid=0x7c in Object.wait() [0x00007fdccd08a000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1252)
	- locked <0x00000000ebe00e50> (a org.apache.spark.util.EventLoop$$anon$1)
	at java.lang.Thread.join(Thread.java:1326)
	at org.apache.spark.util.EventLoop.stop(EventLoop.scala:81)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2100)

but saw a different behavior: JVM just exits when reaching in eventThread.join() as I don't see any outputs after eventThread.join(). Here are all outputs I got:

scala> sc.parallelize(1 to 10).count()
[Stage 0:>                                                          (0 + 8) / 8]19/06/05 12:52:00 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[dag-scheduler-event-loop,5,main]
java.lang.OutOfMemoryError: foo
	at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1344)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2172)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2124)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2113)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:50)
stopping Thread[dag-scheduler-event-loop,5,main] in Thread[Thread-1,5,main]
waiting for Thread[dag-scheduler-event-loop,5,main] in Thread[Thread-1,5,main]

My JDK version is 1.8.0_171. Maybe Thread.join in a shutdown hook behaviors differently in different JDK versions.

Copy link
Contributor Author

@skonto skonto Jun 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing interesting... I use the jvm in the alpine docker image used for Spark on K8s openjdk:8-alpine

sh-4.4$ java -version
openjdk version "1.8.0_201"
OpenJDK Runtime Environment (IcedTea 3.11.0) (Alpine 8.201.08-r1)
OpenJDK 64-Bit Server VM (build 25.212-b04, mixed mode

which is the latest...
Btw from what I read there is no guarantee shutdown hooks will run till completed...
https://dzone.com/articles/know-jvm-series-2-shutdown, that is why I thought just exiting is the most reliable option and that is why I cleared the hooks, although not satisfying in every case.
"First thing to keep in mind is that it is not guaranteed that shutdown hooks will always run. If the JVM crashes due to some internal error, then it might crash down without having a chance to execute a single instruction."
In addition, even if this will not happen often, shutdown hooks need to finish fast anyway so you stand a chance to finish (especially with jvm errors) and waiting in a join when you dont know the state of things is dangerous anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing its possible your change is not triggering the same thing because throwing an OOM from within java code is not the same as an OOM from within the JVM itself (IIRC, when the user throws OOM themselves none of the jvm XX options related to OOMs get triggered).

@SparkQA
Copy link

SparkQA commented Jun 4, 2019

Test build #106166 has finished for PR 24796 at commit 537b76d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@skonto
Copy link
Contributor Author

skonto commented Jun 4, 2019

Don't see how these unit tests relate to this PR, weird.

@SparkQA
Copy link

SparkQA commented Jun 4, 2019

Test build #106168 has finished for PR 24796 at commit affcb04.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@skonto
Copy link
Contributor Author

skonto commented Jun 4, 2019

I dont see how:

org.apache.spark.launcher.SparkLauncherSuite.testSparkLauncherGetError
org.apache.spark.sql.hive.thriftserver.CliSuite.Single command with --database

relate to this PR.

@srowen
Copy link
Member

srowen commented Jun 5, 2019

I think the test failure is related somehow as it concerns how the app exits; not exactly sure yet.

I think removing all the hooks is too heavy a hammer. It seems like EventLoop.stop() blocking on the event thread is problematic here.

  def stop(): Unit = {
    if (stopped.compareAndSet(false, true)) {
      eventThread.interrupt()
      var onStopCalled = false
      try {
        eventThread.join()
        // Call onStop after the event thread exits to make sure onReceive happens before onStop

@zsxwing is this join() really necessary? I see the comment here. But here, an OOM exception crashes the thread, it invokes the shutdown hooks which stop the EventLoop, but then it's waiting on itself to stop.

If this is removed, I am not sure what goes wrong. If onStop happens before or after an onReceive during shutdown, that's fine. If it happens during onReceive, well, it has already been interrupted and won't complete normally.

@skonto
Copy link
Contributor Author

skonto commented Jun 5, 2019

@srowen Ideally yes we want the graceful shutdown without this deadlock if possible. My concern is can we actually be sure things will not lead to a deadlock elsewhere? Probably we need to check the threads allocated in general and involved in the shutdown.

val isExecutor = args.childArgs.contains("--executor-id")
args.deployMode match {
case "client" | null if !isExecutor =>
Thread.setDefaultUncaughtExceptionHandler(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember there was an argument that we should not set the default UncaughtExceptionHandler for driver is Spark doesn't control the whole driver JVM and we should not just help the user decide what should be done in this JVM. They may have their own UncaughtExceptionHandler.

Copy link
Contributor Author

@skonto skonto Jun 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be documentation or something about what is provided (because Spark on K8s is affected for example) but even if the user adds a handler, there will be a deadlock here (could be fixed I guess). In general I dont see people setting one and what happens if the same code is implemented in python (which calls the jvm stuff behind the scenes) you need a default handler in that case I guess, my 2 cents.

Copy link
Member

@zsxwing zsxwing Jun 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was actually supporting this since if a Spark thread crashes, SparkContext and other components probably will not work correctly. Could not find the related PR or JIRA ticket though...

One alternative solution is just setting up UncaughtExceptionHandler for all Spark created threads. Then this will not impact any other user threads.

Copy link
Contributor Author

@skonto skonto Jun 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is currently done only for the fsHistoryProvider (unrelated to spark core):

    initThread.setUncaughtExceptionHandler(errorHandler.getOrElse(
      (_: Thread, e: Throwable) => {
        logError("Error initializing FsHistoryProvider.", e)
        System.exit(1)
      }))

I am not sure how this will help as we will have to exit from there anyway, no?

@zsxwing
Copy link
Member

zsxwing commented Jun 5, 2019

If this is removed, I am not sure what goes wrong. If onStop happens before or after an onReceive during shutdown, that's fine. If it happens during onReceive, well, it has already been interrupted and won't complete normally.

The potential issue is about thread-safety. Right now EventLoop ensures all onXXX calls run in sequence without any overlapping.

@srowen
Copy link
Member

srowen commented Jun 6, 2019

@zsxwing but without the join(), you still don't have these executing at the same time. onReceive() is interrupted before onStop(), so it can't proceed or complete after onStop(). That seems... OK?

@zsxwing
Copy link
Member

zsxwing commented Jun 7, 2019

onReceive() is interrupted before onStop()

But there will be a race condition if removing join. We cannot guarantee that onReceive can return immediately when it receives the interrupt signal.

By the way, is there any theory about how this deadlock can happen? As I mentioned here: #24796 (comment) I could not reproduce it.

@skonto
Copy link
Contributor Author

skonto commented Jun 7, 2019

@zsxwing I describe how this happened in the jira ticket. I just run Spark on K8s SparkPi with 1M as the input parameter. This creates 1M tasks (an array holds them) which creates an OOM error for the DAGScheduler eventLooop thread since this is the one that will eventually try to submit the actual job, of course my jvm mem settings are enough to reproduce it, for the values pls have a look at the jira ticket. Of course this could happen in other cases where jvm is running out of memory and at some point this thread needs to allocate more memory. Btw I can reproduce it on K8s in a consistent manner, it fails every time.

On other thing is that in the code base there are other places where there is a join on a thread that will be stopped via the shutdown hook like contextCleaner and as I mentioned above shutdownHook does a lot of work eg. the SparkContext stop() method does stop a lot of stuff (not to mention there is one hook for Streaming as well).

@skonto
Copy link
Contributor Author

skonto commented Jun 7, 2019

@zsxwing @srowen one solution that works is running the logic of stop in another thread:

def stop() {

if (stopped.compareAndSet(false, true)) {
      @volatile var onStopCalled = false
      try {

        new Thread() { 
          setDaemon(true)
new Runnable {
          override def run(): Unit = {
            try {
              eventThread.join()
            // Call onStop after the event thread exits to make sure onReceive happens before onStop
            onStopCalled = true
            onStop()
            } catch {
              case ie: InterruptedException =>
                Thread.currentThread().interrupt()
                if (!onStopCalled) {
                  // ie is thrown from `eventThread.join()`. Otherwise, we should not call `onStop` since
                  // it's already called.
                  onStop()
                }
            }
          }
        }
        }.start()
    } else {
      // Keep quiet to allow calling `stop` multiple times.
    }
}

Assuming that is safe we let the shutdownhook proceed... any issues with that if we let that run in the background or there is a strict order for how things should run? One thing is that it may never run unless thread priority is high enough...

19/06/07 10:31:21 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[dag-scheduler-event-loop,5,main]
java.lang.OutOfMemoryError: Java heap space
	at scala.collection.mutable.ResizableArray.ensureSize(ResizableArray.scala:106)
	at scala.collection.mutable.ResizableArray.ensureSize$(ResizableArray.scala:96)
	at scala.collection.mutable.ArrayBuffer.ensureSize(ArrayBuffer.scala:49)
	at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:85)
	at org.apache.spark.scheduler.TaskSetManager.addPendingTask(TaskSetManager.scala:264)
	at org.apache.spark.scheduler.TaskSetManager.$anonfun$addPendingTasks$2(TaskSetManager.scala:194)
	at org.apache.spark.scheduler.TaskSetManager$$Lambda$1106/850290016.apply$mcVI$sp(Unknown Source)
	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
	at org.apache.spark.scheduler.TaskSetManager.$anonfun$addPendingTasks$1(TaskSetManager.scala:193)
	at org.apache.spark.scheduler.TaskSetManager$$Lambda$1105/1310826901.apply$mcV$sp(Unknown Source)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:534)
	at org.apache.spark.scheduler.TaskSetManager.addPendingTasks(TaskSetManager.scala:192)
	at org.apache.spark.scheduler.TaskSetManager.<init>(TaskSetManager.scala:189)
	at org.apache.spark.scheduler.TaskSchedulerImpl.createTaskSetManager(TaskSchedulerImpl.scala:252)
	at org.apache.spark.scheduler.TaskSchedulerImpl.submitTasks(TaskSchedulerImpl.scala:210)
	at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1233)
	at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1084)
	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1028)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2126)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2118)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2107)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
19/06/07 10:31:21 INFO SparkContext: Invoking stop() from shutdown hook
19/06/07 10:31:21 INFO SparkUI: Stopped Spark web UI at http://spark-pi2-1559903354898-driver-svc.spark.svc:4040
19/06/07 10:31:21 INFO BlockManagerInfo: Removed broadcast_0_piece0 on spark-pi2-1559903354898-driver-svc.spark.svc:7079 in memory (size: 1765.0 B, free: 110.0 MiB)
19/06/07 10:31:21 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
19/06/07 10:31:21 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
19/06/07 10:31:21 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed (this is expected if the application is shutting down.)
19/06/07 10:31:22 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/06/07 10:31:22 INFO MemoryStore: MemoryStore cleared
19/06/07 10:31:22 INFO BlockManager: BlockManager stopped
19/06/07 10:31:22 INFO BlockManagerMaster: BlockManagerMaster stopped
19/06/07 10:31:22 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/06/07 10:31:22 INFO SparkContext: Successfully stopped SparkContext
19/06/07 10:31:22 INFO ShutdownHookManager: Shutdown hook called
19/06/07 10:31:22 INFO ShutdownHookManager: Deleting directory /tmp/spark-fef9ec63-c71e-4859-9910-12c51a336d75
19/06/07 10:31:22 INFO ShutdownHookManager: Deleting directory /var/data/spark-6a47a0e7-9676-4cf4-96e1-07d710838b8e/spark-657ed153-d11d-478d-94a8-955a80296405
    State:          Terminated
      Reason:       Error
      Exit Code:    52
      Started:      Fri, 07 Jun 2019 13:29:35 +0300
      Finished:     Fri, 07 Jun 2019 13:31:22 +0300

@skonto
Copy link
Contributor Author

skonto commented Jun 11, 2019

@zsxwing @srowen any decision on how to approach this?

@srowen
Copy link
Member

srowen commented Jun 11, 2019

The async option is a little bit hacky and complex, but not bad. I can't think of a better solution. Can it be simpler with a Future? probably not and we lose control over the thread that's used.

@zsxwing
Copy link
Member

zsxwing commented Jun 11, 2019

Before making the decision, I would like to understand why this dead lock can happen. There are still some unknowns here, for example, why "eventThread.interrupt()" doesn't interrupt Thread.join in the dag-scheduler-event-loop thread. ? Is it just a special behavior in shutdown hooks, and it can happen in other cases as well? Without the answers, we don't know if a fix can really fix the root issue.

@skonto
Copy link
Contributor Author

skonto commented Jun 13, 2019

@zsxwing we can only fix the related issue here, just avoid the deadlock so shutdown is finished. As for the generic case I dont see why this thread is not interrupted maybe because this is a special case when handling an Uncaught Exception via a handler coming from the thread it caused it. I will check what jvm does in this case but if there is anyone who knows more feel free to call him here.
Probably hitting this one: https://bugs.openjdk.java.net/browse/JDK-8154017 mentioned here:jacoco/jacoco#394 (comment).
The fix in there ignores all interrupts until the hooks are completed (we call exit as well) so since the uncaught exception handler executes a shutdownhook from the event loop thread, that thread cannot be interrupted. @shipilev Hi ! I saw you reported that error and also discussed the related fix, any help would be great as I dont have the rights to comment to the ticket directly.
Another question is do we have this pattern elsewhere like Master,Worker or Executor where there is already a handler? @srowen also thoughts?

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

jumping in a bit late here, will try to poke at this a bit myself -- but I'm wondering why you didn't try the -XX:+ExitOnOutOfMemoryError option? I thought that had the properties you were looking for.

@squito
Copy link
Contributor

squito commented Jun 21, 2019

I tried to make a very minimal example of interrupt, joins, OOMs and uncaught exception handlers here:

https://gist.github.com/squito/66daf60be5a6fa4415c3b880150d4e8c

Please check if I set that up correctly. But the high-level takeaway is that most the time, the interrupt is NOT received, and the join seems to complete successfully. Once in a while (1/10 or so?) the interrupt is received.

@skonto
Copy link
Contributor Author

skonto commented Jun 21, 2019

@squito I tried XX:+ExitOnOutOfMemoryError (described in jira) but the thing is executors do have proper uncaught exception handling. Driver does not have. So initially I tried to add something like that so things shutdown gracefully if possible given the jvm state but without the shutdown hooks. If shutdown hooks are enabled we have this issue with joins (which is reproducible 100%). Also, If you run the pi example on minikube and Spark will get stuck, so the initial issue where there are no uncaught exception handler is also 100% reproducible. Regarding the interrupts I havent seen it work so far but who knows maybe I got unlucky, the thing is it is also depends on the jdk version because there are fixes happening as described in my last comment which changed the interrupts behavior. I called out the openjdk guys to see if their fix relates to this, no response yet. I will try to to do the join in another thread and make sure it happens with higher priority, so far there is no guarantee for this.

@squito
Copy link
Contributor

squito commented Jun 24, 2019

my point about the inconsistency from my more minimal repro is just that we might not be able to rely on the usual semantics of joins & interrupts after an OOM. Also a reason I'd rather trust XX:+ExitOnOutOfMemoryError as something that's part of the JVM itself (and perhaps it should be added to the executors too for the same reasons). Though one argument against using that is that it was only available starting with java 8u92 (do we specify a minimal version with java 1.8?) and its a non-standard option. The old way to do this is with -XX:OnOutOfMemoryError="kill -9 %p".

@kiszk @mridulm @tgravescs -- maybe you have some thoughts on this one from prior experience with JVM error conditions?

@mridulm
Copy link
Contributor

mridulm commented Jun 25, 2019

I had added exit on OOM as part of initial yarn integration for both driver and executor.
Over the years this has been (incorrectly IMO) removed from spark with attempts to gracefully handle it.
Unlike normal error's/exception's, OOM cannot be handled gracefully in a complex app though. In context of spark, the right thing would be to fail fast.

@skonto
Copy link
Contributor Author

skonto commented Jul 2, 2019

I agree with @mridulm that was my initial statement too even I dont have previous experience here.

@tgravescs
Copy link
Contributor

I agree with @mridulm, fail fast on OOM as you get weird things happening. I've never seen a hang like described other then when users start non-daemon threads that don't exit the same.

@skonto
Copy link
Contributor Author

skonto commented Jul 3, 2019

@srowen @zsxwing should I fall back to the initial approach of clearing the shutdownhooks and exiting immediately? How should I proceed (the only thing I havent tried is to run the stop logic in a thread with high priority and a dedicated thread pool just in case that works and stop logic has the chance to run)?
I don't see a lot of alternatives here (still waiting for @shipilev).

@squito
Copy link
Contributor

squito commented Jul 3, 2019

I feel like the consensus here is to not change spark's shutdoown hooks etc, and instead change the spark-on-k8s setup to run with -XX:OnOutOfMemoryError="kill -9 %p" for the driver.

@skonto
Copy link
Contributor Author

skonto commented Jul 7, 2019

@squito -XX:OnOutOfMemoryError="kill -9 %p" I think there more options now

https://stackoverflow.com/questions/5792049/xxonoutofmemoryerror-kill-9-p-problem after java8u92 I use:

-XX:+ExitOnOutOfMemoryError
-XX:+CrashOnOutOfMemoryError

Anyway if that is the consensus let's all agree, what about the executors? They have a shutdown handler... should they? I think this issue goes beyond K8s it affects all deployments. Also sometimes you may want to collect the crash report, so CrashOnOutOfMemoryError maybe a better option in some scenarios.

@skonto
Copy link
Contributor Author

skonto commented Jul 12, 2019

@squito @srowen @dongjoon-hyun by having a handler (as mentioned in the ticket by HenryYu) without running shutdownhooks we could solve also: https://issues.apache.org/jira/browse/SPARK-27812

@squito
Copy link
Contributor

squito commented Jul 16, 2019

yarn does add the same thing to executors: https://github.com/apache/spark/blob/master/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala#L124

to be honest, I have no idea why all cluster managers don't use that -- I don't have enough experience to feel confident about them, but I'd suggest we do the same for kubernetes (@mccheah @yifeih @ifilonenko do you know of any particular reason you would not set it on kubernetes?)

@ifilonenko
Copy link
Contributor

I can see no reason why Kubernetes shouldn't include a kill on OOM. I agree that this should be extended for Kubernetes. These OOM issues do cause issues for our operators so this would be useful to report back a failed job. @skonto I'd prefer to see -XX:+ExitOnOutOfMemoryError be added to the kube entrypoint. at the very least to have parity with yarn.

@mccheah
Copy link
Contributor

mccheah commented Jul 16, 2019

Yeah adding the uncaught exception hook shouldn't hurt here. As long as this doesn't eliminate the pod from the system (so that one can access the logs if necessary), it should be fine.

@felixcheung
Copy link
Member

Yay from me.

@squito
Copy link
Contributor

squito commented Jul 17, 2019

wait, you are not all saying the same thing. @mccheah I am suggesting that we add the -XX:OnOutOfMemoryError="kill -9 %p" to kubernetes, I am NOT proposing we add an uncaught exception handler. (I do not think we should use the newer cmds which are not present in all versions of java8.)

Adding an uncaught exception handler is tricky on the driver because the user may have registered their own uncaught exception handler. https://issues.apache.org/jira/browse/SPARK-27812 has a reason maybe we should do that too. But regardless of whether we decide to do that or not, I'm still suggesting we add -XX:OnOutOfMemoryError

@skonto
Copy link
Contributor Author

skonto commented Jul 19, 2019

Ok if we all agree I can open a PR to add XX:OnOutOfMemoryError at the entrypoint with the old flag ( although java 11 is coming though and people are encouraged to use the latest for security reasons and so we should do in our images). @squito all executors get a handler here:

uncaughtExceptionHandler: UncaughtExceptionHandler = new SparkUncaughtExceptionHandler)
and set it here:
Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler)

CoarseGrainedExecutorBackend which is used by K8s creates an executor instance that sets that.
@felixcheung @squito @mccheah @ifilonenko
The problem is https://issues.apache.org/jira/browse/SPARK-27812, which can appear due to another type of exception besides OOM (the only solution there is add a handler afaik).

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while.
This isn't a judgement on the merit of the PR in any way. It's just
a way of keeping the PR queue manageable.

If you'd like to revive this PR, please reopen it!

@github-actions github-actions bot added the Stale label Dec 28, 2019
@github-actions github-actions bot closed this Dec 29, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.