-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-27665][Core] Split fetch shuffle blocks protocol from OpenBlocks #24565
Conversation
Test build #105282 has finished for PR 24565 at commit
|
Test build #105318 has finished for PR 24565 at commit
|
Ready for review, cc @cloud-fan. |
common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
Outdated
Show resolved
Hide resolved
...work-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FetchShuffleBlocks.java
Outdated
Show resolved
Hide resolved
...work-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FetchShuffleBlocks.java
Outdated
Show resolved
Hide resolved
...on/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
Outdated
Show resolved
Hide resolved
I think 3.0 is a good chance to do this change, cc @zsxwing @JoshRosen |
Test build #105412 has finished for PR 24565 at commit
|
Test build #105418 has finished for PR 24565 at commit
|
Test build #105419 has finished for PR 24565 at commit
|
...work-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
Outdated
Show resolved
Hide resolved
...on/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
Outdated
Show resolved
Hide resolved
...on/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
Outdated
Show resolved
Hide resolved
...on/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
Outdated
Show resolved
Hide resolved
...on/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
Outdated
Show resolved
Hide resolved
...work-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FetchShuffleBlocks.java
Outdated
Show resolved
Hide resolved
...on/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
Outdated
Show resolved
Hide resolved
...on/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
Outdated
Show resolved
Hide resolved
Test build #105455 has finished for PR 24565 at commit
|
Test build #105480 has finished for PR 24565 at commit
|
This change makes sense, but one thing which has been in the back of my mind for a while -- I really think we should add versioning to the external shuffle service. Its very hard to users to always upgrade the external shuffle service at the same time. Without versioning and some migration period, eg. so a 3.0 spark app can talk to a 2.4 spark shuffle service, it makes upgrades really difficult. if we do that, you'd want to send the new messages only if you're talking to a new shuffle service. and maybe in 3.1 we'd remove the old code completely. I realize this is an extra headache but I'd like us to consider it. |
cc @attilapiros as I think this is touching code from your PRs (also my comment about versioning the shuffle service applies for your changes too) |
@squito good idea about versioning! I think we should have done it at the first place, and now it's a good chance to add versioning in the new shuffle protocol. |
@squito I think the best would be to introduce a new message for asking the version of the running shuffle service and making the decision based on that version. But if we would like to support 2.4 too without modify it (and adding this new message backward) then besides this new message we should introduce a new config parameter too for the shuffle service version (in Spark) and when its value is greater equal where the version message is available we can use the message to find the exact version. Question: should we do that in one of these PRs or separately? (I would be happy to work on it just let's decided where we want it and avoid parallel work). |
The thing I'm not sure is how important it is to support shuffle service 2.x + spark application 3.x. I feel it's good enough to have a shuffle service 3.x that support all versions of spark applications, but I might be wrong as I'm far away from real deploy environment. If we do want to support that, I think a reasonable choice can be: create a config to revert to the old shuffle protocol, and ask users to set this config if they see shuffle protocol related errors. We can also extend the shuffle protocol: client asks the server for its version first, and then pick the correct shuffle message to send. This might cause perf regression as it adds an extra RPC call, and I'm not sure it's worth if old shuffle service + new spark application is not a common use case. |
I think its very useful to have spark 3.0 be able to talk to a spark 2.4 shuffle service. The shuffle service can be harder to upgrade, as it effects all applications, while you could have a different spark version per application. Especially across a version breaking compatibility, this makes it easier for users to upgrade incrementally. I was originally thinking that we'd extend the And I was hoping that only if that msg was not properly handled, then you'd auto-detect that you were talking to an old shuffle server. So it would be the same number of rpcs when all the versions match, but an extra one if you're using new spark with old shuffle server, just across this initial boundary where we update this message type. But that could actually be pretty slow I guess, so a config seems like a good option as well. |
Is that always sent? Haven't traced the code but IIRC that's only the message when you register with your local shuffle service, not when you're getting data from a remote one... |
Thanks for the advice from all reviewers. I agree with @squito on the suggestion about new spark job talk to old shuffle service, and achieved by newly added config, which is the same idea with @cloud-fan. It has been done in a159f41. Please check. For the versioning, I also think it's necessary to work at this time. Maybe we can add the version in the FetchShuffleBlocks message? Check the version in the server side and choose the different way for decoding multi-version of FetchShuffleBlocks message after Spark 3.0. I think it can be done in a separate PR and JIRA. The added config can achieve the compatibility between <3.0 and 3.0 in this PR. WDYT? @vanzin @attilapiros @cloud-fan @squito. |
Let's start doing versioning in the new shuffle protocol. The new message |
I think the versioning change should be its own jira & pr -- sorry I started this discussion here as it related to this but now I think it should be tracked separately. I filed https://issues.apache.org/jira/browse/SPARK-27780 and tried to capture the discussion here so far, but please add to it. |
Test build #105574 has finished for PR 24565 at commit
|
@squito thanks for opening the ticket! Agree with adding versioning separately. I have no more comments on this PR. Let's wait a few days and see if others have objections to merging it. |
Resolve conflict with #24499. |
Test build #105831 has finished for PR 24565 at commit
|
thanks, merging to master! |
Thanks for all your help. |
…enerate the shuffle files After the newly added shuffle block fetching protocol in #24565, we can keep this work by extending the FetchShuffleBlocks message. ### What changes were proposed in this pull request? In this patch, we achieve the indeterminate shuffle rerun by reusing the task attempt id(unique id within an application) in shuffle id, so that each shuffle write attempt has a different file name. For the indeterministic stage, when the stage resubmits, we'll clear all existing map status and rerun all partitions. All changes are summarized as follows: - Change the mapId to mapTaskAttemptId in shuffle related id. - Record the mapTaskAttemptId in MapStatus. - Still keep mapId in ShuffleFetcherIterator for fetch failed scenario. - Add the determinate flag in Stage and use it in DAGScheduler and the cleaning work for the intermediate stage. ### Why are the changes needed? This is a follow-up work for #22112's future improvment[1]: `Currently we can't rollback and rerun a shuffle map stage, and just fail.` Spark will rerun a finished shuffle write stage while meeting fetch failures, currently, the rerun shuffle map stage will only resubmit the task for missing partitions and reuse the output of other partitions. This logic is fine in most scenarios, but for indeterministic operations(like repartition), multiple shuffle write attempts may write different data, only rerun the missing partition will lead a correctness bug. So for the shuffle map stage of indeterministic operations, we need to support rolling back the shuffle map stage and re-generate the shuffle files. ### Does this PR introduce any user-facing change? Yes, after this PR, the indeterminate stage rerun will be accepted by rerunning the whole stage. The original behavior is aborting the stage and fail the job. ### How was this patch tested? - UT: Add UT for all changing code and newly added function. - Manual Test: Also providing a manual test to verify the effect. ``` import scala.sys.process._ import org.apache.spark.TaskContext val determinateStage0 = sc.parallelize(0 until 1000 * 1000 * 100, 10) val indeterminateStage1 = determinateStage0.repartition(200) val indeterminateStage2 = indeterminateStage1.repartition(200) val indeterminateStage3 = indeterminateStage2.repartition(100) val indeterminateStage4 = indeterminateStage3.repartition(300) val fetchFailIndeterminateStage4 = indeterminateStage4.map { x => if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId == 190 && TaskContext.get.stageAttemptNumber == 0) { throw new Exception("pkill -f -n java".!!) } x } val indeterminateStage5 = fetchFailIndeterminateStage4.repartition(200) val finalStage6 = indeterminateStage5.repartition(100).collect().distinct.length ``` It's a simple job with multi indeterminate stage, it will get a wrong answer while using old Spark version like 2.2/2.3, and will be killed after #22112. With this fix, the job can retry all indeterminate stage as below screenshot and get the right result.  Closes #25620 from xuanyuanking/SPARK-25341-8.27. Authored-by: Yuanjian Li <xyliyuanjian@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This PR takes over #19788. After we split the shuffle fetch protocol from `OpenBlock` in #24565, this optimization can be extended in the new shuffle protocol. Credit to yucai, closes #19788. ### What changes were proposed in this pull request? This PR adds the support for continuous shuffle block fetching in batch: - Shuffle client changes: - Add new feature tag `spark.shuffle.fetchContinuousBlocksInBatch`, implement the decision logic in `BlockStoreShuffleReader`. - Merge the continuous shuffle block ids in batch if needed in ShuffleBlockFetcherIterator. - Shuffle server changes: - Add support in `ExternalBlockHandler` for the external shuffle service side. - Make `ShuffleBlockResolver.getBlockData` accept getting block data by range. - Protocol changes: - Add new block id type `ShuffleBlockBatchId` represent continuous shuffle block ids. - Extend `FetchShuffleBlocks` and `OneForOneBlockFetcher`. - After the new shuffle fetch protocol completed in #24565, the backward compatibility for external shuffle service can be controlled by `spark.shuffle.useOldFetchProtocol`. ### Why are the changes needed? In adaptive execution, one reducer may fetch multiple continuous shuffle blocks from one map output file. However, as the original approach, each reducer needs to fetch those 10 reducer blocks one by one. This way needs many IO and impacts performance. This PR is to support fetching those continuous shuffle blocks in one IO (batch way). See below example: The shuffle block is stored like below:  The ShuffleId format is s"shuffle_$shuffleId_$mapId_$reduceId", referring to BlockId.scala. In adaptive execution, one reducer may want to read output for reducer 5 to 14, whose block Ids are from shuffle_0_x_5 to shuffle_0_x_14. Before this PR, Spark needs 10 disk IOs + 10 network IOs for each output file. After this PR, Spark only needs 1 disk IO and 1 network IO. This way can reduce IO dramatically. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Add new UT. Integrate test with setting `spark.sql.adaptive.enabled=true`. Closes #26040 from xuanyuanking/SPARK-9853. Lead-authored-by: Yuanjian Li <xyliyuanjian@gmail.com> Co-authored-by: yucai <yyu1@ebay.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
As the current approach in OneForOneBlockFetcher, we reuse the OpenBlocks protocol to describe the fetch request for shuffle blocks, and it causes the extension work for shuffle fetching like apache#19788 and apache#24110 very awkward. In this PR, we split the fetch request for shuffle blocks from OpenBlocks which named FetchShuffleBlocks. It's a loose bind with ShuffleBlockId and can easily extend by adding new fields in this protocol. Existing and new added UT. Closes apache#24565 from xuanyuanking/SPARK-27665. Authored-by: Yuanjian Li <xyliyuanjian@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 8949bc7)
What changes were proposed in this pull request?
As the current approach in OneForOneBlockFetcher, we reuse the OpenBlocks protocol to describe the fetch request for shuffle blocks, and it causes the extension work for shuffle fetching like #19788 and #24110 very awkward.
In this PR, we split the fetch request for shuffle blocks from OpenBlocks which named FetchShuffleBlocks. It's a loose bind with ShuffleBlockId and can easily extend by adding new fields in this protocol.
How was this patch tested?
Existing and new added UT.