-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32923][CORE][SHUFFLE] Handle indeterminate stage retries for push-based shuffle #33034
Conversation
cc @mridulm @Victsm @otterc @zhouyejoe @Ngone51 Please take a look. Currently it is in work in progress as tests are being added. Raised this PR now since we are making protocol changes, it would be better if it can be done before branch-3.2 cut that way at least protocol changes can be merged if reviews on implementation details takes more time. Thanks :) |
de08b6e
to
39e5df6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to handle the case of how shuffleSequenceId needs to be configured.
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes to fetch protocols seem unnecessary because fetch will never request shuffle data of an older shuffleSequenceId.
...n/network-common/src/main/java/org/apache/spark/network/protocol/MergedBlockMetaRequest.java
Outdated
Show resolved
Hide resolved
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
Outdated
Show resolved
Hide resolved
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some more comments
...ffle/src/main/java/org/apache/spark/network/shuffle/protocol/AbstractFetchShuffleBlocks.java
Outdated
Show resolved
Hide resolved
...work-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FetchShuffleBlocks.java
Outdated
Show resolved
Hide resolved
@mridulm @otterc @Ngone51 Currently I have updated the PR with the changes of SPARK-35546, will remove it once the PR gets merged. Please review. |
6e21a27
to
92ada56
Compare
...on/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
Outdated
Show resolved
Hide resolved
@otterc feels this PR is quite big, I agree. I will break this PR in to 2 client and server and keep this for reference purposes. |
Is this still WIP ? |
Yeah I am in the process of breaking this into 2 PRs. Will update here once that is done. |
After having offline discussions with @mridulm , we decided not to break this PR in to 2. Will fix one of the pending change and remove the WIP tag. |
@venkata91 Could you please provide a reason for not breaking this PR in multiple parts. This PR is touching a lot files both on the client and server side. It changes the protocols. On the client, this again touches the driver side, the push side, and the fetch side. |
|
I tend to agree with not break because of the RC timeline. Usually, multiple PRs take more time to get all merged in than one PR. |
@mridulm Fixed the conflict, should be good now. Will wait for the tests to run completely. |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #141932 has finished for PR 33034 at commit
|
Thanks @venkata91, the tests are passing with the update. |
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Show resolved
Hide resolved
@venkata91 Thanks for working on this. LGTM. |
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Show resolved
Hide resolved
…ush-based shuffle [[SPARK-23243](https://issues.apache.org/jira/browse/SPARK-23243)] and [[SPARK-25341](https://issues.apache.org/jira/browse/SPARK-25341)] addressed cases of stage retries for indeterminate stage involving operations like repartition. This PR addresses the same issues in the context of push-based shuffle. Currently there is no way to distinguish the current execution of a stage for a shuffle ID. Therefore the changes explained below are necessary. Core changes are summarized as follows: 1. Introduce a new variable `shuffleMergeId` in `ShuffleDependency` which is monotonically increasing value tracking the temporal ordering of execution of <stage-id, stage-attempt-id> for a shuffle ID. 2. Correspondingly make changes in the push-based shuffle protocol layer in `MergedShuffleFileManager`, `BlockStoreClient` passing the `shuffleMergeId` in order to keep track of the shuffle output in separate files on the shuffle service side. 3. `DAGScheduler` increments the `shuffleMergeId` tracked in `ShuffleDependency` in the cases of a indeterministic stage execution 4. Deterministic stage will have `shuffleMergeId` set to 0 as no special handling is needed in this case and indeterminate stage will have `shuffleMergeId` starting from 1. New protocol changes are needed due to the reasons explained above. No Added new unit tests in `RemoteBlockPushResolverSuite, DAGSchedulerSuite, BlockIdSuite, ErrorHandlerSuite` Closes #33034 from venkata91/SPARK-32923. Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit c039d99) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
Merged to master and branch-3.2 +CC @gengliangwang Thanks for fixing this @venkata91 This was the last patch for push based shuffle SPIP - the only pending task is documentation. |
@venkata91 Thanks for the work! |
Thanks for the thorough reviews @mridulm @Ngone51 @otterc @zhouyejoe . Learned quite a lot :) |
…ush-based shuffle [[SPARK-23243](https://issues.apache.org/jira/browse/SPARK-23243)] and [[SPARK-25341](https://issues.apache.org/jira/browse/SPARK-25341)] addressed cases of stage retries for indeterminate stage involving operations like repartition. This PR addresses the same issues in the context of push-based shuffle. Currently there is no way to distinguish the current execution of a stage for a shuffle ID. Therefore the changes explained below are necessary. Core changes are summarized as follows: 1. Introduce a new variable `shuffleMergeId` in `ShuffleDependency` which is monotonically increasing value tracking the temporal ordering of execution of <stage-id, stage-attempt-id> for a shuffle ID. 2. Correspondingly make changes in the push-based shuffle protocol layer in `MergedShuffleFileManager`, `BlockStoreClient` passing the `shuffleMergeId` in order to keep track of the shuffle output in separate files on the shuffle service side. 3. `DAGScheduler` increments the `shuffleMergeId` tracked in `ShuffleDependency` in the cases of a indeterministic stage execution 4. Deterministic stage will have `shuffleMergeId` set to 0 as no special handling is needed in this case and indeterminate stage will have `shuffleMergeId` starting from 1. New protocol changes are needed due to the reasons explained above. No Added new unit tests in `RemoteBlockPushResolverSuite, DAGSchedulerSuite, BlockIdSuite, ErrorHandlerSuite` Closes apache#33034 from venkata91/SPARK-32923. Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit c039d99)
…when finalize request for higher shuffleMergeId is received ### What changes were proposed in this pull request? Clean up older shuffleMergeId shuffle files when finalize request for higher shuffleMergeId is received when no blocks pushed for the corresponding shuffleMergeId. This is identified as part of #33034 (comment). ### Why are the changes needed? Without this change, older shuffleMergeId files won't be cleaned up properly. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added changes to existing unit test to address this case. Closes #33605 from venkata91/SPARK-32923-follow-on. Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
…when finalize request for higher shuffleMergeId is received ### What changes were proposed in this pull request? Clean up older shuffleMergeId shuffle files when finalize request for higher shuffleMergeId is received when no blocks pushed for the corresponding shuffleMergeId. This is identified as part of #33034 (comment). ### Why are the changes needed? Without this change, older shuffleMergeId files won't be cleaned up properly. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added changes to existing unit test to address this case. Closes #33605 from venkata91/SPARK-32923-follow-on. Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit d816949) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
### What changes were proposed in this pull request? Cleanup `RemoteBlockPushResolver` log messages by using `AppShufflePartitionInfo#toString()` to avoid duplications. Currently this is based off of #33034 will remove those changes once it is merged and remove the WIP at that time. ### Why are the changes needed? Minor cleanup to make code more readable. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? No tests, just changing log messages Closes #33561 from venkata91/SPARK-36332. Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com> Signed-off-by: yi.wu <yi.wu@databricks.com>
### What changes were proposed in this pull request? Cleanup `RemoteBlockPushResolver` log messages by using `AppShufflePartitionInfo#toString()` to avoid duplications. Currently this is based off of #33034 will remove those changes once it is merged and remove the WIP at that time. ### Why are the changes needed? Minor cleanup to make code more readable. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? No tests, just changing log messages Closes #33561 from venkata91/SPARK-36332. Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com> Signed-off-by: yi.wu <yi.wu@databricks.com> (cherry picked from commit ab89710) Signed-off-by: yi.wu <yi.wu@databricks.com>
…ush-based shuffle [[SPARK-23243](https://issues.apache.org/jira/browse/SPARK-23243)] and [[SPARK-25341](https://issues.apache.org/jira/browse/SPARK-25341)] addressed cases of stage retries for indeterminate stage involving operations like repartition. This PR addresses the same issues in the context of push-based shuffle. Currently there is no way to distinguish the current execution of a stage for a shuffle ID. Therefore the changes explained below are necessary. Core changes are summarized as follows: 1. Introduce a new variable `shuffleMergeId` in `ShuffleDependency` which is monotonically increasing value tracking the temporal ordering of execution of <stage-id, stage-attempt-id> for a shuffle ID. 2. Correspondingly make changes in the push-based shuffle protocol layer in `MergedShuffleFileManager`, `BlockStoreClient` passing the `shuffleMergeId` in order to keep track of the shuffle output in separate files on the shuffle service side. 3. `DAGScheduler` increments the `shuffleMergeId` tracked in `ShuffleDependency` in the cases of a indeterministic stage execution 4. Deterministic stage will have `shuffleMergeId` set to 0 as no special handling is needed in this case and indeterminate stage will have `shuffleMergeId` starting from 1. New protocol changes are needed due to the reasons explained above. No Added new unit tests in `RemoteBlockPushResolverSuite, DAGSchedulerSuite, BlockIdSuite, ErrorHandlerSuite` Closes apache#33034 from venkata91/SPARK-32923. Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit c039d99) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
…when finalize request for higher shuffleMergeId is received ### What changes were proposed in this pull request? Clean up older shuffleMergeId shuffle files when finalize request for higher shuffleMergeId is received when no blocks pushed for the corresponding shuffleMergeId. This is identified as part of apache#33034 (comment). ### Why are the changes needed? Without this change, older shuffleMergeId files won't be cleaned up properly. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added changes to existing unit test to address this case. Closes apache#33605 from venkata91/SPARK-32923-follow-on. Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit d816949) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
…ush-based shuffle [[SPARK-23243](https://issues.apache.org/jira/browse/SPARK-23243)] and [[SPARK-25341](https://issues.apache.org/jira/browse/SPARK-25341)] addressed cases of stage retries for indeterminate stage involving operations like repartition. This PR addresses the same issues in the context of push-based shuffle. Currently there is no way to distinguish the current execution of a stage for a shuffle ID. Therefore the changes explained below are necessary. Core changes are summarized as follows: 1. Introduce a new variable `shuffleMergeId` in `ShuffleDependency` which is monotonically increasing value tracking the temporal ordering of execution of <stage-id, stage-attempt-id> for a shuffle ID. 2. Correspondingly make changes in the push-based shuffle protocol layer in `MergedShuffleFileManager`, `BlockStoreClient` passing the `shuffleMergeId` in order to keep track of the shuffle output in separate files on the shuffle service side. 3. `DAGScheduler` increments the `shuffleMergeId` tracked in `ShuffleDependency` in the cases of a indeterministic stage execution 4. Deterministic stage will have `shuffleMergeId` set to 0 as no special handling is needed in this case and indeterminate stage will have `shuffleMergeId` starting from 1. New protocol changes are needed due to the reasons explained above. No Added new unit tests in `RemoteBlockPushResolverSuite, DAGSchedulerSuite, BlockIdSuite, ErrorHandlerSuite` Closes #33034 from venkata91/SPARK-32923. Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
…when finalize request for higher shuffleMergeId is received ### What changes were proposed in this pull request? Clean up older shuffleMergeId shuffle files when finalize request for higher shuffleMergeId is received when no blocks pushed for the corresponding shuffleMergeId. This is identified as part of #33034 (comment). ### Why are the changes needed? Without this change, older shuffleMergeId files won't be cleaned up properly. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added changes to existing unit test to address this case. Closes #33605 from venkata91/SPARK-32923-follow-on. Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
What changes were proposed in this pull request?
[SPARK-23243] and [SPARK-25341] addressed cases of stage retries for indeterminate stage involving operations like repartition. This PR addresses the same issues in the context of push-based shuffle. Currently there is no way to distinguish the current execution of a stage for a shuffle ID. Therefore the changes explained below are necessary.
Core changes are summarized as follows:
shuffleMergeId
inShuffleDependency
which is monotonically increasing value tracking the temporal ordering of execution of <stage-id, stage-attempt-id> for a shuffle ID.MergedShuffleFileManager
,BlockStoreClient
passing theshuffleMergeId
in order to keep track of the shuffle output in separate files on the shuffle service side.DAGScheduler
increments theshuffleMergeId
tracked inShuffleDependency
in the cases of a indeterministic stage executionshuffleMergeId
set to 0 as no special handling is needed in this case and indeterminate stage will haveshuffleMergeId
starting from 1.Why are the changes needed?
New protocol changes are needed due to the reasons explained above.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added new unit tests in
RemoteBlockPushResolverSuite, DAGSchedulerSuite, BlockIdSuite, ErrorHandlerSuite