-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-35546][Shuffle] Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the state in a better way #33078
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took an initial pass through the PR
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Show resolved
Hide resolved
Jenkins, test this please. |
Ok to test |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #140315 has finished for PR 33078 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see any UTs added for all the issues that were identified with concurrency. Is there a problem in adding UTs for these cases?
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
Thanks @mridulm @otterc @venkata91 for reviewing the PR. Addressed majority of the comments. Hi, @Ngone51 . Can you also help review this one? Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks mostly good, had a couple of comments.
Will wait for @Ngone51, @otterc and @venkata91's reviews to complete as well.
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
...network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
Test build #140368 has finished for PR 33078 at commit
|
Kubernetes integration test unable to build dist. exiting with code: 141 |
@Ngone51 I am leaving on slightly long-ish vacation without access to my laptop .. will it be possible for you to shepard this pr and #33034 ? |
@mridulm Sure! Enjoy your vacation! |
@zhouyejoe Could you address all the comments? Branch-3.2 is already cut, we now have very limited time to merge this into it. |
@Ngone51 Sure, will update the PR ASAP. |
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #141187 has finished for PR 33078 at commit
|
Kubernetes integration test status success |
Test build #141191 has finished for PR 33078 at commit
|
...network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java
Outdated
Show resolved
Hide resolved
Added a unit test for this one yesterday. |
Kubernetes integration test unable to build dist. exiting with code: 1 |
Test build #141208 has finished for PR 33078 at commit
|
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Show resolved
Hide resolved
Thanks for the approve @Ngone51 ! |
LGTM. |
Looks good to me. Thanks for working on it @zhouyejoe |
Thanks for all the reviews. |
Jenkins, test this please |
Kubernetes integration test starting |
Kubernetes integration test status success |
Kubernetes integration test unable to build dist. exiting with code: 1 |
Test build #141284 has finished for PR 33078 at commit
|
Test build #141286 has finished for PR 33078 at commit
|
…tempts are enabled and manage concurrent access to the state in a better way ### What changes were proposed in this pull request? This is one of the patches for SPIP SPARK-30602 which is needed for push-based shuffle. ### Summary of the change: When Executor registers with Shuffle Service, it will encode the merged shuffle dir created and also the application attemptId into the ShuffleManagerMeta into Json. Then in Shuffle Service, it will decode the Json string and get the correct merged shuffle dir and also the attemptId. If the registration comes from a newer attempt, the merged shuffle information will be updated to store the information from the newer attempt. This PR also refactored the management of the merged shuffle information to avoid concurrency issues. ### Why are the changes needed? Refer to the SPIP in SPARK-30602. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unit tests. The reference PR with the consolidated changes covering the complete implementation is also provided in SPARK-30602. We have already verified the functionality and the improved performance as documented in the SPIP doc. Closes #33078 from zhouyejoe/SPARK-35546. Authored-by: Ye Zhou <yezhou@linkedin.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit c77acf0) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
Merged to master and branch-3.2 |
+CC @gengliangwang |
@@ -64,25 +68,28 @@ public boolean equals(Object other) { | |||
if (other != null && other instanceof FinalizeShuffleMerge) { | |||
FinalizeShuffleMerge o = (FinalizeShuffleMerge) other; | |||
return Objects.equal(appId, o.appId) | |||
&& appAttemptId == appAttemptId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fixed by the following.
…tempts are enabled and manage concurrent access to the state in a better way This is one of the patches for SPIP SPARK-30602 which is needed for push-based shuffle. When Executor registers with Shuffle Service, it will encode the merged shuffle dir created and also the application attemptId into the ShuffleManagerMeta into Json. Then in Shuffle Service, it will decode the Json string and get the correct merged shuffle dir and also the attemptId. If the registration comes from a newer attempt, the merged shuffle information will be updated to store the information from the newer attempt. This PR also refactored the management of the merged shuffle information to avoid concurrency issues. Refer to the SPIP in SPARK-30602. No. Added unit tests. The reference PR with the consolidated changes covering the complete implementation is also provided in SPARK-30602. We have already verified the functionality and the improved performance as documented in the SPIP doc. Closes apache#33078 from zhouyejoe/SPARK-35546. Authored-by: Ye Zhou <yezhou@linkedin.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit c77acf0)
…tempts are enabled and manage concurrent access to the state in a better way ### What changes were proposed in this pull request? This is one of the patches for SPIP SPARK-30602 which is needed for push-based shuffle. ### Summary of the change: When Executor registers with Shuffle Service, it will encode the merged shuffle dir created and also the application attemptId into the ShuffleManagerMeta into Json. Then in Shuffle Service, it will decode the Json string and get the correct merged shuffle dir and also the attemptId. If the registration comes from a newer attempt, the merged shuffle information will be updated to store the information from the newer attempt. This PR also refactored the management of the merged shuffle information to avoid concurrency issues. ### Why are the changes needed? Refer to the SPIP in SPARK-30602. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unit tests. The reference PR with the consolidated changes covering the complete implementation is also provided in SPARK-30602. We have already verified the functionality and the improved performance as documented in the SPIP doc. Closes apache#33078 from zhouyejoe/SPARK-35546. Authored-by: Ye Zhou <yezhou@linkedin.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit c77acf0) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
…tempts are enabled and manage concurrent access to the state in a better way This is one of the patches for SPIP SPARK-30602 which is needed for push-based shuffle. When Executor registers with Shuffle Service, it will encode the merged shuffle dir created and also the application attemptId into the ShuffleManagerMeta into Json. Then in Shuffle Service, it will decode the Json string and get the correct merged shuffle dir and also the attemptId. If the registration comes from a newer attempt, the merged shuffle information will be updated to store the information from the newer attempt. This PR also refactored the management of the merged shuffle information to avoid concurrency issues. Refer to the SPIP in SPARK-30602. No. Added unit tests. The reference PR with the consolidated changes covering the complete implementation is also provided in SPARK-30602. We have already verified the functionality and the improved performance as documented in the SPIP doc. Closes #33078 from zhouyejoe/SPARK-35546. Authored-by: Ye Zhou <yezhou@linkedin.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
What changes were proposed in this pull request?
This is one of the patches for SPIP SPARK-30602 which is needed for push-based shuffle.
Summary of the change:
When Executor registers with Shuffle Service, it will encode the merged shuffle dir created and also the application attemptId into the ShuffleManagerMeta into Json. Then in Shuffle Service, it will decode the Json string and get the correct merged shuffle dir and also the attemptId. If the registration comes from a newer attempt, the merged shuffle information will be updated to store the information from the newer attempt.
This PR also refactored the management of the merged shuffle information to avoid concurrency issues.
Why are the changes needed?
Refer to the SPIP in SPARK-30602.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Added unit tests.
The reference PR with the consolidated changes covering the complete implementation is also provided in SPARK-30602.
We have already verified the functionality and the improved performance as documented in the SPIP doc.