-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-1396] Properly cleanup DAGScheduler on job cancellation. #305
Conversation
cc @markhamstra and @rxin |
Merged build triggered. |
Merged build started. |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13695/ |
Merged build triggered. |
Merged build started. |
Merged build triggered. |
Merged build started. |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13696/ |
Jenkins, retest this please |
Merged build triggered. |
Merged build started. |
Merged build finished. All automated tests passed. |
All automated tests passed. |
Merged build finished. All automated tests passed. |
All automated tests passed. |
test("run shuffle with map stage failure") { | ||
val shuffleMapRdd = makeRdd(2, Nil) | ||
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) | ||
val shuffleId = shuffleDep.shuffleId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not used
Merged build triggered. |
Merged build started. |
Thanks for reviewing @markhamstra -- made the changes you suggested and will merge later today if you don't see any other issues! |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13846/ |
Jenkins, retest this please |
Merged build triggered. |
Merged build started. |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13848/ |
Previously, when jobs were cancelled, not all of the state in the DAGScheduler was cleaned up, leading to a slow memory leak in the DAGScheduler. As we expose easier ways ot cancel jobs, it's more important to fix these issues. This commit adds 3 tests. “run shuffle with map stage failure” is a new test to more thoroughly test this functionality, and passes on both the old and new versions of the code. “trivial job cancellation” fails on the old code because all state wasn’t cleaned up correctly when jobs were cancelled (we didn’t remove the job from resultStageToJob). “failure of stage used by two jobs” fails on the old code because taskScheduler.cancelTasks wasn’t called for one of the stages (see test comments).
Merged build triggered. |
Merged build started. |
Merged build finished. All automated tests passed. |
All automated tests passed. |
I've merged this into master |
[I wanted to post this for folks to comment but it depends on (and thus includes the changes in) a currently outstanding PR, #305. You can look at just the second commit: kayousterhout@93f08ba to see just the changes relevant to this PR] Previously, when stages fail or get cancelled, the SparkListener is only notified indirectly through the SparkListenerJobEnd, where we sometimes pass in a single stage that failed. This worked before job cancellation, because jobs would only fail due to a single stage failure. However, with job cancellation, multiple running stages can fail when a job gets cancelled. Right now, this is not handled correctly, which results in stages that get stuck in the “Running Stages” window in the UI even though they’re dead. This PR changes the SparkListenerStageCompleted event to a SparkListenerStageEnded event, and uses this event to tell SparkListeners when stages fail in addition to when they complete successfully. This change is NOT publicly backward compatible for two reasons. First, it changes the SparkListener interface. We could alternately add a new event, SparkListenerStageFailed, and keep the existing SparkListenerStageCompleted. However, this is less consistent with the listener events for tasks / jobs ending, and will result in some code duplication for listeners (because failed and completed stages are handled in similar ways). Note that I haven’t finished updating the JSON code to correctly handle the new event because I’m waiting for feedback on whether this is a good or bad idea (hence the “WIP”). It is also not backwards compatible because it changes the publicly visible JobWaiter.jobFailed() method to no longer include a stage that caused the failure. I think this change should definitely stay, because with cancellation (as described above), a failure isn’t necessarily caused by a single stage. Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #309 from kayousterhout/stage_cancellation and squashes the following commits: 5533ecd [Kay Ousterhout] Fixes in response to Mark's review 320c7c7 [Kay Ousterhout] Notify SparkListeners when stages fail or are cancelled.
Previously, when jobs were cancelled, not all of the state in the DAGScheduler was cleaned up, leading to a slow memory leak in the DAGScheduler. As we expose easier ways to cancel jobs, it's more important to fix these issues. This commit also fixes a second and less serious problem, which is that previously, when a stage failed, not all of the appropriate stages were cancelled. See the "failure of stage used by two jobs" test for an example of this. This just meant that extra work was done, and is not a correctness problem. This commit adds 3 tests. “run shuffle with map stage failure” is a new test to more thoroughly test this functionality, and passes on both the old and new versions of the code. “trivial job cancellation” fails on the old code because all state wasn’t cleaned up correctly when jobs were cancelled (we didn’t remove the job from resultStageToJob). “failure of stage used by two jobs” fails on the old code because taskScheduler.cancelTasks wasn’t called for one of the stages (see test comments). This should be checked in before apache#246, which makes it easier to cancel stages / jobs. Author: Kay Ousterhout <kayousterhout@gmail.com> Closes apache#305 from kayousterhout/incremental_abort_fix and squashes the following commits: f33d844 [Kay Ousterhout] Mark review comments 9217080 [Kay Ousterhout] Properly cleanup DAGScheduler on job cancellation.
[I wanted to post this for folks to comment but it depends on (and thus includes the changes in) a currently outstanding PR, apache#305. You can look at just the second commit: kayousterhout@93f08ba to see just the changes relevant to this PR] Previously, when stages fail or get cancelled, the SparkListener is only notified indirectly through the SparkListenerJobEnd, where we sometimes pass in a single stage that failed. This worked before job cancellation, because jobs would only fail due to a single stage failure. However, with job cancellation, multiple running stages can fail when a job gets cancelled. Right now, this is not handled correctly, which results in stages that get stuck in the “Running Stages” window in the UI even though they’re dead. This PR changes the SparkListenerStageCompleted event to a SparkListenerStageEnded event, and uses this event to tell SparkListeners when stages fail in addition to when they complete successfully. This change is NOT publicly backward compatible for two reasons. First, it changes the SparkListener interface. We could alternately add a new event, SparkListenerStageFailed, and keep the existing SparkListenerStageCompleted. However, this is less consistent with the listener events for tasks / jobs ending, and will result in some code duplication for listeners (because failed and completed stages are handled in similar ways). Note that I haven’t finished updating the JSON code to correctly handle the new event because I’m waiting for feedback on whether this is a good or bad idea (hence the “WIP”). It is also not backwards compatible because it changes the publicly visible JobWaiter.jobFailed() method to no longer include a stage that caused the failure. I think this change should definitely stay, because with cancellation (as described above), a failure isn’t necessarily caused by a single stage. Author: Kay Ousterhout <kayousterhout@gmail.com> Closes apache#309 from kayousterhout/stage_cancellation and squashes the following commits: 5533ecd [Kay Ousterhout] Fixes in response to Mark's review 320c7c7 [Kay Ousterhout] Notify SparkListeners when stages fail or are cancelled.
* Clean up resources that are not used by pods. * Make client side send correct credentials. * Simplify cleanup logic. Cancellation is no longer instantaneous and we might clean up a little later than the given TTL. However, the tradeoff is a simpler implementation with clearer contracts about when things will and will not be cleaned up. * Remove class * Fix imports and line length. * Remove import. * Add a unit test for StagingResourcesStore. * Revamp cleanup process. - Delete resources immediately when owners do not exist - Delete resources if after they are first uploaded, they are not accessed for a certain period of time. - Resource owners are more specifically defined and can have a type (currently only uses pods) * Clarify log messages * Use a single set of credentials in resource staging server. Also refactors construction of Kubernetes Clients to unify the code paths. * Fix unit test. * Safe close if creating shuffle block handler fails * Use implicit class. * Address comments. * Fix broken test.
* Clean up resources that are not used by pods. * Make client side send correct credentials. * Simplify cleanup logic. Cancellation is no longer instantaneous and we might clean up a little later than the given TTL. However, the tradeoff is a simpler implementation with clearer contracts about when things will and will not be cleaned up. * Remove class * Fix imports and line length. * Remove import. * Add a unit test for StagingResourcesStore. * Revamp cleanup process. - Delete resources immediately when owners do not exist - Delete resources if after they are first uploaded, they are not accessed for a certain period of time. - Resource owners are more specifically defined and can have a type (currently only uses pods) * Clarify log messages * Use a single set of credentials in resource staging server. Also refactors construction of Kubernetes Clients to unify the code paths. * Fix unit test. * Safe close if creating shuffle block handler fails * Use implicit class. * Address comments. * Fix broken test.
…runing ### What changes were proposed in this pull request? Remove `OptimizeSubqueries` from batch of `PartitionPruning` to make DPP support more cases. For example: ```sql SELECT date_id, product_id FROM fact_sk f JOIN (select store_id + 3 as new_store_id from dim_store where country = 'US') s ON f.store_id = s.new_store_id ``` Before this PR: ``` == Physical Plan == *(2) Project [date_id#3998, product_id#3999] +- *(2) BroadcastHashJoin [store_id#4001], [new_store_id#3997], Inner, BuildRight, false :- *(2) ColumnarToRow : +- FileScan parquet default.fact_sk[date_id#3998,product_id#3999,store_id#4001] Batched: true, DataFilters: [], Format: Parquet, PartitionFilters: [isnotnull(store_id#4001), dynamicpruningexpression(true)], PushedFilters: [], ReadSchema: struct<date_id:int,product_id:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#274] +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997] +- *(1) Filter ((isnotnull(country#4004) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3))) +- *(1) ColumnarToRow +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002 + 3))], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string> ``` After this PR: ``` == Physical Plan == *(2) Project [date_id#3998, product_id#3999] +- *(2) BroadcastHashJoin [store_id#4001], [new_store_id#3997], Inner, BuildRight, false :- *(2) ColumnarToRow : +- FileScan parquet default.fact_sk[date_id#3998,product_id#3999,store_id#4001] Batched: true, DataFilters: [], Format: Parquet, PartitionFilters: [isnotnull(store_id#4001), dynamicpruningexpression(store_id#4001 IN dynamicpruning#4007)], PushedFilters: [], ReadSchema: struct<date_id:int,product_id:int> : +- SubqueryBroadcast dynamicpruning#4007, 0, [new_store_id#3997], [id=#263] : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#262] : +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997] : +- *(1) Filter ((isnotnull(country#4004) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3))) : +- *(1) ColumnarToRow : +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002 + 3))], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string> +- ReusedExchange [new_store_id#3997], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#262] ``` This is because `OptimizeSubqueries` will infer more filters, so we cannot reuse broadcasts. The following is the plan if disable `spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly`: ``` == Physical Plan == *(2) Project [date_id#3998, product_id#3999] +- *(2) BroadcastHashJoin [store_id#4001], [new_store_id#3997], Inner, BuildRight, false :- *(2) ColumnarToRow : +- FileScan parquet default.fact_sk[date_id#3998,product_id#3999,store_id#4001] Batched: true, DataFilters: [], Format: Parquet, PartitionFilters: [isnotnull(store_id#4001), dynamicpruningexpression(store_id#4001 IN subquery#4009)], PushedFilters: [], ReadSchema: struct<date_id:int,product_id:int> : +- Subquery subquery#4009, [id=#284] : +- *(2) HashAggregate(keys=[new_store_id#3997#4008], functions=[]) : +- Exchange hashpartitioning(new_store_id#3997#4008, 5), ENSURE_REQUIREMENTS, [id=#280] : +- *(1) HashAggregate(keys=[new_store_id#3997 AS new_store_id#3997#4008], functions=[]) : +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997] : +- *(1) Filter (((isnotnull(store_id#4002) AND isnotnull(country#4004)) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3))) : +- *(1) ColumnarToRow : +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(store_id#4002), isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002..., Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(store_id), IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#305] +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997] +- *(1) Filter ((isnotnull(country#4004) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3))) +- *(1) ColumnarToRow +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002 + 3))], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string> ``` ### Why are the changes needed? Improve DPP to support more cases. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test and benchmark test: SQL | Before this PR(Seconds) | After this PR(Seconds) -- | -- | -- TPC-DS q58 | 40 | 20 TPC-DS q83 | 18 | 14 Closes #33664 from wangyum/SPARK-36444. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Yuming Wang <yumwang@ebay.com>
…runing ### What changes were proposed in this pull request? Remove `OptimizeSubqueries` from batch of `PartitionPruning` to make DPP support more cases. For example: ```sql SELECT date_id, product_id FROM fact_sk f JOIN (select store_id + 3 as new_store_id from dim_store where country = 'US') s ON f.store_id = s.new_store_id ``` Before this PR: ``` == Physical Plan == *(2) Project [date_id#3998, product_id#3999] +- *(2) BroadcastHashJoin [store_id#4001], [new_store_id#3997], Inner, BuildRight, false :- *(2) ColumnarToRow : +- FileScan parquet default.fact_sk[date_id#3998,product_id#3999,store_id#4001] Batched: true, DataFilters: [], Format: Parquet, PartitionFilters: [isnotnull(store_id#4001), dynamicpruningexpression(true)], PushedFilters: [], ReadSchema: struct<date_id:int,product_id:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#274] +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997] +- *(1) Filter ((isnotnull(country#4004) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3))) +- *(1) ColumnarToRow +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002 + 3))], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string> ``` After this PR: ``` == Physical Plan == *(2) Project [date_id#3998, product_id#3999] +- *(2) BroadcastHashJoin [store_id#4001], [new_store_id#3997], Inner, BuildRight, false :- *(2) ColumnarToRow : +- FileScan parquet default.fact_sk[date_id#3998,product_id#3999,store_id#4001] Batched: true, DataFilters: [], Format: Parquet, PartitionFilters: [isnotnull(store_id#4001), dynamicpruningexpression(store_id#4001 IN dynamicpruning#4007)], PushedFilters: [], ReadSchema: struct<date_id:int,product_id:int> : +- SubqueryBroadcast dynamicpruning#4007, 0, [new_store_id#3997], [id=#263] : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#262] : +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997] : +- *(1) Filter ((isnotnull(country#4004) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3))) : +- *(1) ColumnarToRow : +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002 + 3))], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string> +- ReusedExchange [new_store_id#3997], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#262] ``` This is because `OptimizeSubqueries` will infer more filters, so we cannot reuse broadcasts. The following is the plan if disable `spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly`: ``` == Physical Plan == *(2) Project [date_id#3998, product_id#3999] +- *(2) BroadcastHashJoin [store_id#4001], [new_store_id#3997], Inner, BuildRight, false :- *(2) ColumnarToRow : +- FileScan parquet default.fact_sk[date_id#3998,product_id#3999,store_id#4001] Batched: true, DataFilters: [], Format: Parquet, PartitionFilters: [isnotnull(store_id#4001), dynamicpruningexpression(store_id#4001 IN subquery#4009)], PushedFilters: [], ReadSchema: struct<date_id:int,product_id:int> : +- Subquery subquery#4009, [id=#284] : +- *(2) HashAggregate(keys=[new_store_id#3997#4008], functions=[]) : +- Exchange hashpartitioning(new_store_id#3997#4008, 5), ENSURE_REQUIREMENTS, [id=#280] : +- *(1) HashAggregate(keys=[new_store_id#3997 AS new_store_id#3997#4008], functions=[]) : +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997] : +- *(1) Filter (((isnotnull(store_id#4002) AND isnotnull(country#4004)) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3))) : +- *(1) ColumnarToRow : +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(store_id#4002), isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002..., Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(store_id), IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#305] +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997] +- *(1) Filter ((isnotnull(country#4004) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3))) +- *(1) ColumnarToRow +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002 + 3))], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string> ``` ### Why are the changes needed? Improve DPP to support more cases. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test and benchmark test: SQL | Before this PR(Seconds) | After this PR(Seconds) -- | -- | -- TPC-DS q58 | 40 | 20 TPC-DS q83 | 18 | 14 Closes #33664 from wangyum/SPARK-36444. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Yuming Wang <yumwang@ebay.com> (cherry picked from commit 2310b99) Signed-off-by: Yuming Wang <yumwang@ebay.com>
Previously, when jobs were cancelled, not all of the state in the
DAGScheduler was cleaned up, leading to a slow memory leak in the
DAGScheduler. As we expose easier ways to cancel jobs, it's more
important to fix these issues.
This commit also fixes a second and less serious problem, which is that
previously, when a stage failed, not all of the appropriate stages
were cancelled. See the "failure of stage used by two jobs" test
for an example of this. This just meant that extra work was done, and is
not a correctness problem.
This commit adds 3 tests. “run shuffle with map stage failure” is
a new test to more thoroughly test this functionality, and passes on
both the old and new versions of the code. “trivial job
cancellation” fails on the old code because all state wasn’t cleaned
up correctly when jobs were cancelled (we didn’t remove the job from
resultStageToJob). “failure of stage used by two jobs” fails on the
old code because taskScheduler.cancelTasks wasn’t called for one of
the stages (see test comments).
This should be checked in before #246, which makes it easier to
cancel stages / jobs.