-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-20217][core] Executor should not fail stage if killed task throws non-interrupted exception #17531
Conversation
Test build #75525 has finished for PR 17531 at commit
|
@@ -432,7 +432,7 @@ private[spark] class Executor( | |||
setTaskFinishedAndClearInterruptStatus() | |||
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled(t.reason))) | |||
|
|||
case _: InterruptedException if task.reasonIfKilled.isDefined => | |||
case _: Throwable if task.reasonIfKilled.isDefined => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throwable
-> NonFatal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mridulm, that was my initial reaction as well, but I noticed that we're already catching Throwable
down on line 447.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That said, the existing Throwable
case has logic to exit the JVM in case the throwable was fatal (see Utils.isFatalError(t))
used further down).
We probably want to preserve the behavior of continuing to exit on fatal errors while still trying to issue a final "dying breath" task status. Thus maybe we can catch NonFatal
up here, as you've proposed, and add another piece of conditional logic to handle fatal errors in the existing Throwable
case. This adds a bit of complexity and we'd need another test case to fully cover it, but that could be done by having a task which throws OutOfMemoryError
once interrupted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think NonFatal is probably fine here -- an OOM could presumably cause these messages to be dropped and the task marked as failed anyways.
@@ -432,7 +432,7 @@ private[spark] class Executor( | |||
setTaskFinishedAndClearInterruptStatus() | |||
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled(t.reason))) | |||
|
|||
case _: InterruptedException if task.reasonIfKilled.isDefined => | |||
case NonFatal(_) if task != null && task.reasonIfKilled.isDefined => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, task != null
, excellent catch; completely missed that !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the fix @ericl !
I will leave it around a bit in case @JoshRosen has any further comments. Feel free to merge btw if you dont ! |
Test build #75552 has finished for PR 17531 at commit
|
Test build #75553 has finished for PR 17531 at commit
|
test this please |
LGTM pending Jenkins. |
Test build #75559 has finished for PR 17531 at commit
|
Thanks. Merging to master. |
…thrown by cancelled tasks ## What changes were proposed in this pull request? This was a regression introduced by my earlier PR here: #17531 It turns out NonFatal() does not in fact catch InterruptedException. ## How was this patch tested? Extended cancellation unit test coverage. The first test fails before this patch. cc JoshRosen mridulm Author: Eric Liang <ekl@databricks.com> Closes #17659 from ericl/spark-20358. (cherry picked from commit b2ebadf) Signed-off-by: Yin Huai <yhuai@databricks.com>
…thrown by cancelled tasks ## What changes were proposed in this pull request? This was a regression introduced by my earlier PR here: apache#17531 It turns out NonFatal() does not in fact catch InterruptedException. ## How was this patch tested? Extended cancellation unit test coverage. The first test fails before this patch. cc JoshRosen mridulm Author: Eric Liang <ekl@databricks.com> Closes apache#17659 from ericl/spark-20358.
…ows non-interrupted exception ## What changes were proposed in this pull request? If tasks throw non-interrupted exceptions on kill (e.g. java.nio.channels.ClosedByInterruptException), their death is reported back as TaskFailed instead of TaskKilled. This causes stage failure in some cases. This is reproducible as follows. Run the following, and then use SparkContext.killTaskAttempt to kill one of the tasks. The entire stage will fail since we threw a RuntimeException instead of InterruptedException. ``` spark.range(100).repartition(100).foreach { i => try { Thread.sleep(10000000) } catch { case t: InterruptedException => throw new RuntimeException(t) } } ``` Based on the code in TaskSetManager, I think this also affects kills of speculative tasks. However, since the number of speculated tasks is few, and usually you need to fail a task a few times before the stage is cancelled, it unlikely this would be noticed in production unless both speculation was enabled and the num allowed task failures was = 1. We should probably unconditionally return TaskKilled instead of TaskFailed if the task was killed by the driver, regardless of the actual exception thrown. ## How was this patch tested? Unit test. The test fails before the change in Executor.scala cc JoshRosen Author: Eric Liang <ekl@databricks.com> Closes apache#17531 from ericl/fix-task-interrupt.
…thrown by cancelled tasks This was a regression introduced by my earlier PR here: apache#17531 It turns out NonFatal() does not in fact catch InterruptedException. Extended cancellation unit test coverage. The first test fails before this patch. cc JoshRosen mridulm Author: Eric Liang <ekl@databricks.com> Closes apache#17659 from ericl/spark-20358.
What changes were proposed in this pull request?
If tasks throw non-interrupted exceptions on kill (e.g. java.nio.channels.ClosedByInterruptException), their death is reported back as TaskFailed instead of TaskKilled. This causes stage failure in some cases.
This is reproducible as follows. Run the following, and then use SparkContext.killTaskAttempt to kill one of the tasks. The entire stage will fail since we threw a RuntimeException instead of InterruptedException.
Based on the code in TaskSetManager, I think this also affects kills of speculative tasks. However, since the number of speculated tasks is few, and usually you need to fail a task a few times before the stage is cancelled, it unlikely this would be noticed in production unless both speculation was enabled and the num allowed task failures was = 1.
We should probably unconditionally return TaskKilled instead of TaskFailed if the task was killed by the driver, regardless of the actual exception thrown.
How was this patch tested?
Unit test. The test fails before the change in Executor.scala
cc @JoshRosen