Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32915][CORE] Network-layer and shuffle RPC layer changes to support push shuffle blocks #29855

Closed
wants to merge 29 commits into from

Conversation

Victsm
Copy link
Contributor

@Victsm Victsm commented Sep 23, 2020

What changes were proposed in this pull request?

This is the first patch for SPIP SPARK-30602 for push-based shuffle.
Summary of changes:

  • Introduce new API in ExternalBlockStoreClient to push blocks to a remote shuffle service.
  • Leveraging the streaming upload functionality in SPARK-6237, it also enables the ExternalBlockHandler to delegate the handling of block push requests to MergedShuffleFileManager.
  • Propose the API for MergedShuffleFileManager, where the core logic on the shuffle service side to handle block push requests is defined. The actual implementation of this API is deferred into a later RB to restrict the size of this PR.
  • Introduce OneForOneBlockPusher to enable pushing blocks to remote shuffle services in shuffle RPC layer.
  • New protocols in shuffle RPC layer to support the functionalities.

Why are the changes needed?

Refer to the SPIP in SPARK-30602

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added unit tests.
The reference PR with the consolidated changes covering the complete implementation is also provided in SPARK-30602.
We have already verified the functionality and the improved performance as documented in the SPIP doc.

Lead-authored-by: Min Shen mshen@linkedin.com
Co-authored-by: Chandni Singh chsingh@linkedin.com
Co-authored-by: Ye Zhou yezhou@linkedin.com

Victsm and others added 15 commits September 18, 2020 21:39
The following changes are included in this patch.
In addition, fixed a potential block duplicate issue when speculative execution is enabled, and improved test coverage.

commit 7e134c2b75c8882474a67c15036087eb8a02caef
Author: Chandni Singh <chsingh@linkedin.com>
Date:   Tue Apr 21 21:54:46 2020 -0700

    LIHADOOP-52202 Create merge_manager under app local dirs

    RB=2069854
    G=superfriends-reviewers
    R=mshen,yezhou
    A=chsingh

commit 63dcda9309fc06c5f1fb6e7268df1d7416db49c7
Author: Chandni Singh <chsingh@linkedin.com>
Date:   Wed Apr 1 12:15:21 2020 -0700

    LIHADOOP-51889 Divide remote fetches into smaller chunks

    RB=2029681

commit 35298465155ec10e6ee2caf1adc0e78717dc6fed
Author: Chandni Singh <chsingh@linkedin.com>
Date:   Thu Mar 19 17:47:40 2020 -0700

    LIHADOOP-51889 Writing last chunk offsets to merge index file

    RB=2016700
    BUG=LIHADOOP-51889
    G=superfriends-reviewers
    R=mshen
    A=mshen

commit bbb53ec0fdfa0aebda954ede17a9f6e217607a53
Author: Min Shen <mshen@linkedin.com>
Date:   Thu Dec 19 08:46:34 2019 -0800

    Shuffle server and client properly handles merged block fetch failure.
    Use file length as merged shuffle block size when serving merged shuffle block.

commit 6718074c6a6a98b1d66d4fdff6bf08fb266ce32e
Author: Min Shen <mshen@linkedin.com>
Date:   Mon Nov 18 14:19:20 2019 -0800

    Netty protocol for DAGScheduler control message

commit 52e4dfade2e004fbc39fc60937342a9a57872680
Author: Min Shen <mshen@linkedin.com>
Date:   Sun Sep 8 18:44:09 2019 -0700

    Netty protocol for remote block push, pass 3

commit e9db4cc1ae56e9722a598b0011a10e55e84bf19c
Author: Min Shen <mshen@linkedin.com>
Date:   Thu Sep 5 18:29:24 2019 -0700

    Netty protocol for remote block push, pass 2

commit 7627ecf54292edda4a133e596f53306e7af76100
Author: Min Shen <mshen@linkedin.com>
Date:   Fri Aug 30 08:54:08 2019 -0700

    Netty protocol for remote block push, pass 1
RB=2096937
G=spark-reviewers
R=chsingh,mshen
A=mshen
…ResolverSuite

RB=2101153
BUG=LIHADOOP-53438
G=spark-reviewers
R=mshen,yezhou
A=yezhou
RB=2104829
BUG=LIHADOOP-53496
G=spark-reviewers
R=yezhou,mshen
A=mshen
…les in NM

RB=2130238
BUG=LIHADOOP-53700
G=spark-reviewers
R=mshen,chsingh
A=chsingh
… service is unable to create them

RB=2146753
BUG=LIHADOOP-53940
G=spark-reviewers
R=mshen,yezhou
A=mshen,yezhou
…l dirs provided to executor and the shuffle service and not log all exceptions at error/warning level

RB=2152736
BUG=LIHADOOP-53496,LIHADOOP-54059
G=spark-reviewers
R=yezhou,mshen
A=mshen
RB=2166324
BUG=LIHADOOP-54379
G=spark-reviewers
R=yezhou,mshen
A=mshen
RB=2166258
BUG=LIHADOOP-54370
G=spark-reviewers
R=mshen,yezhou
A=mshen
… a shuffle chunk fails

RB=2203642
BUG=LIHADOOP-52494
G=spark-reviewers
R=yzhou,mshen,vsowrira
A=mshen
RB=2253833
G=spark-reviewers
R=mshen,vsowrira,mmuralid,yezhou
A=mshen
…al host with a consistent view of app local dirs among different executors

RB=2261073
BUG=LIHADOOP-55315
G=spark-reviewers
R=chsingh,mshen,vsowrira,mmuralid
A=mmuralid,chsingh
… local dirs update in shuffle service. Also fixing a memory leak.

RB=2281730
BUG=LIHADOOP-55654
G=spark-reviewers
R=vsowrira,chsingh,mshen
A=vsowrira,chsingh
@Victsm
Copy link
Contributor Author

Victsm commented Sep 23, 2020

A few clarifications on this PR:
The entire netty RPC layer change for push-based shuffle is ~4000 LOC in our current implementation. We plan to break it down into 3 PRs for easier review:

  • The first one in this PR focus on the foundation for supporting block push functionalities
  • The second PR will provide the actual implementation for the MergedShuffleFileManager, as well as the integration with YARNShuffleService
  • The third PR will provide the read path implementation supporting fetching a merged shuffle file as a sequence of chunks

In addition, there are some additional refactoring we could do with this PR.
For example, we reuse RetryingBlockFetcher and BlockFetchingListener for block push as well.
This makes their naming not appropriate any more.
We didn't make that change in this PR to reduce the number of files we touch, so it's easier to review.
We can either send out a separate PR just to do these refactoring or update this PR, depending on the reviewers' preferences.

Also, regarding the approach to encode the PushBlockStream header into the exception message to be returned to the client, this is to minimize changes to the existing Netty protocol, so that it's easier to introduce such a change to clusters actively using the existing protocol.
We are open to suggestions.

Copy link
Contributor

@otterc otterc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Thanks @Victsm

@mridulm
Copy link
Contributor

mridulm commented Sep 23, 2020

ok to test

@mridulm
Copy link
Contributor

mridulm commented Sep 23, 2020

+CC @jiangxb1987

@mridulm
Copy link
Contributor

mridulm commented Sep 23, 2020

+CC @attilapiros, @mccheah

@mridulm
Copy link
Contributor

mridulm commented Sep 23, 2020

Something seems off about the jenkins test. @shaneknapp can you please take a look ?

@mridulm
Copy link
Contributor

mridulm commented Sep 23, 2020

+CC @tgravescs

@Victsm
Copy link
Contributor Author

Victsm commented Sep 23, 2020

Fixed the Java style issue and the 1 UT failure.
Test build should be clean now.

@SparkQA
Copy link

SparkQA commented Sep 23, 2020

Test build #129045 has finished for PR 29855 at commit 2bdf800.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 24, 2020

Test build #129046 has finished for PR 29855 at commit 3e9e9e1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 12, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34306/

@SparkQA
Copy link

SparkQA commented Oct 12, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34306/

@SparkQA
Copy link

SparkQA commented Oct 12, 2020

Test build #129699 has finished for PR 29855 at commit f016b39.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

ErrorHandler.BlockPushErrorHandler.TOO_LATE_MESSAGE_SUFFIX))));
assertFalse(handler.shouldRetryError(new RuntimeException(new ConnectException())));
assertTrue(handler.shouldRetryError(new RuntimeException(new IllegalArgumentException(
ErrorHandler.BlockPushErrorHandler.BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX))));
Copy link
Member

@Ngone51 Ngone51 Oct 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering why this error is retriable. My understanding is that the error returns when there's already another attempt wrote or being writing (not sure if this case also included) the same shuffle block. So does retry is to prevent the case where the writing attempt fails to write the block completely?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not the same block, but another block belonging to the same shuffle partition.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah..sorry. I do mean the same shuffle partition rather than the same block.

WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's retriable because this block hasn't been appended to the merged shuffle file and the merge operation hasn't been finalized yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we append a block to the merged shuffle file, we either append it completely or we end up effectively writing nothing to the file.
If the first attempt failed because of collision, then that block effectively hasn't been appended to the file yet, which makes it retriable.
In the 2nd PR to be sent out soon, it will include more details for this part.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the first attempt failed because of collision, then that block effectively hasn't been appended to the file yet, which makes it retriable.

What makes the first attempt failed because of collision? With my understanding, it has two possibilities:

  1. the same partition has been already merged by another task attempt
  2. the same partition is merging by another task attempt

For case 1, do we still need to retry? If we do retry in this case, doesn't it return BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX again?

For case2, I think it may make sense to retry in case of that attempt doesn't merge partition successfully at the end.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually distinguish between a block duplication and a block collision on the server side.

Block duplication is when the exact same shuffle partition block gets pushed by different executors, due to speculation or maybe task retry.
The server side is able to tell when block duplication happens, whether is one duplicate block sent after the first has been successfully merged or when both blocks are received at the same time.
With duplicate block, the server will actually respond success to the client, so the client won't retry sending it.
In the case of speculation, when 2 clients might be sending the same block at the same time, the server will respond success to 1 of the two and let the other write, and if that write fails the corresponding client will retry if it's retriable.

On the other hand, a block collision is not about the exact same shuffle partition block, but 2 different blocks belonging to the same shuffle partition being sent to the same shuffle service at the same time.
Since the shuffle service need to append 1 block completely before appending the content of the next block belonging to the same shuffle partition, when these blocks arrive at one shuffle service at the same time, we would encounter a block collision.
A block collision might not immediately lead to the collision failure sent back to the client, since the server will buffer the blocks for a short period of time and make a few attempts before giving up.
When the shuffle service gives up, the client will receive the collision failure.
If it receives the collision failure, it's an indication that this block hasn't been merged yet, and thus it's OK to retry.
Of course, it's totally possible that by the time the retry happens, a speculative task has already pushed the block and successfully merged it.
If that's the case, the retry would be treated as a block duplication instead of a block collision, and the client will receive success response.

I hope this servers as an overview of what's to come in the next PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks for the detailed explanation.

@Victsm
Copy link
Contributor Author

Victsm commented Oct 13, 2020

Thanks for the additional review comments from @jiangxb1987 @Ngone51. I should have resolved all pending issues.

Ping @attilapiros @tgravescs @mridulm to see if there're any additional concerns on the PR and if we can get a +1.

@Victsm
Copy link
Contributor Author

Victsm commented Oct 13, 2020

The most recent test failure does not seem related to this patch.

@Ngone51
Copy link
Member

Ngone51 commented Oct 14, 2020

LGTM

@mridulm
Copy link
Contributor

mridulm commented Oct 14, 2020

Thanks for the review @Ngone51 !
+CC @tgravescs, @attilapiros, @jiangxb1987, @otterc any additional comments ? Or are all concerns resolved ?.

@otterc
Copy link
Contributor

otterc commented Oct 14, 2020

Looks good to me.

@SparkQA
Copy link

SparkQA commented Oct 14, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34371/

@SparkQA
Copy link

SparkQA commented Oct 15, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34371/

@SparkQA
Copy link

SparkQA commented Oct 15, 2020

Test build #129765 has finished for PR 29855 at commit 2c95f18.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@attilapiros
Copy link
Contributor

I do not like the error message check (the string contain) but as it it is promised to be addressed in a follow-up PR #29855 (comment), it is fine now.

LGTM

@tgravescs
Copy link
Contributor

changes look fine

@mridulm
Copy link
Contributor

mridulm commented Oct 15, 2020

Thanks for the reviews @attilapiros, @tgravescs, @otterc, @jiangxb1987, @Ngone51 Merging to master.

@asfgit asfgit closed this in 82eea13 Oct 15, 2020
@mridulm
Copy link
Contributor

mridulm commented Oct 15, 2020

Thanks for working on this @Victsm ! Looking forward to the next set of patches on push based shuffle :-)

@dongjoon-hyun
Copy link
Member

Thank you all!

asfgit pushed a commit that referenced this pull request Jul 26, 2021
…ush operations

### What changes were proposed in this pull request?
This is a follow-up to #29855 according to the [comments](https://github.com/apache/spark/pull/29855/files#r505536514)
In this PR, the following changes are made:

1. A new `BlockPushingListener` interface is created specifically for block push. The existing `BlockFetchingListener` interface is left as is, since it might be used by external shuffle solutions. These 2 interfaces are unified under `BlockTransferListener` to enable code reuse.
2. `RetryingBlockFetcher`, `BlockFetchStarter`, and `RetryingBlockFetchListener` are renamed to `RetryingBlockTransferor`, `BlockTransferStarter`, and `RetryingBlockTransferListener` respectively. This makes their names more generic to be reused across both block fetch and push.
3. Comments in `OneForOneBlockPusher` are further clarified to better explain how we handle retries for block push.

### Why are the changes needed?
To make code cleaner without sacrificing backward compatibility.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing unit tests.

Closes #33340 from Victsm/SPARK-32915-followup.

Lead-authored-by: Min Shen <mshen@linkedin.com>
Co-authored-by: Min Shen <victor.nju@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
asfgit pushed a commit that referenced this pull request Jul 26, 2021
…ush operations

### What changes were proposed in this pull request?
This is a follow-up to #29855 according to the [comments](https://github.com/apache/spark/pull/29855/files#r505536514)
In this PR, the following changes are made:

1. A new `BlockPushingListener` interface is created specifically for block push. The existing `BlockFetchingListener` interface is left as is, since it might be used by external shuffle solutions. These 2 interfaces are unified under `BlockTransferListener` to enable code reuse.
2. `RetryingBlockFetcher`, `BlockFetchStarter`, and `RetryingBlockFetchListener` are renamed to `RetryingBlockTransferor`, `BlockTransferStarter`, and `RetryingBlockTransferListener` respectively. This makes their names more generic to be reused across both block fetch and push.
3. Comments in `OneForOneBlockPusher` are further clarified to better explain how we handle retries for block push.

### Why are the changes needed?
To make code cleaner without sacrificing backward compatibility.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing unit tests.

Closes #33340 from Victsm/SPARK-32915-followup.

Lead-authored-by: Min Shen <mshen@linkedin.com>
Co-authored-by: Min Shen <victor.nju@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit c4aa54e)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
zhouyejoe pushed a commit to linkedin/spark that referenced this pull request Aug 3, 2021
…ush operations

This is a follow-up to apache#29855 according to the [comments](https://github.com/apache/spark/pull/29855/files#r505536514)
In this PR, the following changes are made:

1. A new `BlockPushingListener` interface is created specifically for block push. The existing `BlockFetchingListener` interface is left as is, since it might be used by external shuffle solutions. These 2 interfaces are unified under `BlockTransferListener` to enable code reuse.
2. `RetryingBlockFetcher`, `BlockFetchStarter`, and `RetryingBlockFetchListener` are renamed to `RetryingBlockTransferor`, `BlockTransferStarter`, and `RetryingBlockTransferListener` respectively. This makes their names more generic to be reused across both block fetch and push.
3. Comments in `OneForOneBlockPusher` are further clarified to better explain how we handle retries for block push.

To make code cleaner without sacrificing backward compatibility.

No

Existing unit tests.

Closes apache#33340 from Victsm/SPARK-32915-followup.

Lead-authored-by: Min Shen <mshen@linkedin.com>
Co-authored-by: Min Shen <victor.nju@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit c4aa54e)
domybest11 pushed a commit to domybest11/spark that referenced this pull request Jun 15, 2022
…ush operations

### What changes were proposed in this pull request?
This is a follow-up to apache#29855 according to the [comments](https://github.com/apache/spark/pull/29855/files#r505536514)
In this PR, the following changes are made:

1. A new `BlockPushingListener` interface is created specifically for block push. The existing `BlockFetchingListener` interface is left as is, since it might be used by external shuffle solutions. These 2 interfaces are unified under `BlockTransferListener` to enable code reuse.
2. `RetryingBlockFetcher`, `BlockFetchStarter`, and `RetryingBlockFetchListener` are renamed to `RetryingBlockTransferor`, `BlockTransferStarter`, and `RetryingBlockTransferListener` respectively. This makes their names more generic to be reused across both block fetch and push.
3. Comments in `OneForOneBlockPusher` are further clarified to better explain how we handle retries for block push.

### Why are the changes needed?
To make code cleaner without sacrificing backward compatibility.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing unit tests.

Closes apache#33340 from Victsm/SPARK-32915-followup.

Lead-authored-by: Min Shen <mshen@linkedin.com>
Co-authored-by: Min Shen <victor.nju@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit c4aa54e)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
wangyum pushed a commit that referenced this pull request May 26, 2023
…pport push shuffle blocks

This is the first patch for SPIP SPARK-30602 for push-based shuffle.
Summary of changes:
* Introduce new API in ExternalBlockStoreClient to push blocks to a remote shuffle service.
* Leveraging the streaming upload functionality in SPARK-6237, it also enables the ExternalBlockHandler to delegate the handling of block push requests to MergedShuffleFileManager.
* Propose the API for MergedShuffleFileManager, where the core logic on the shuffle service side to handle block push requests is defined. The actual implementation of this API is deferred into a later RB to restrict the size of this PR.
* Introduce OneForOneBlockPusher to enable pushing blocks to remote shuffle services in shuffle RPC layer.
* New protocols in shuffle RPC layer to support the functionalities.

Refer to the SPIP in SPARK-30602

No.

Added unit tests.
The reference PR with the consolidated changes covering the complete implementation is also provided in SPARK-30602.
We have already verified the functionality and the improved performance as documented in the SPIP doc.

Lead-authored-by: Min Shen <mshenlinkedin.com>
Co-authored-by: Chandni Singh <chsinghlinkedin.com>
Co-authored-by: Ye Zhou <yezhoulinkedin.com>

Closes #29855 from Victsm/SPARK-32915.

Lead-authored-by: Min Shen <mshen@linkedin.com>
Co-authored-by: Chandni Singh <chsingh@linkedin.com>
Co-authored-by: Ye Zhou <yezhou@linkedin.com>
Co-authored-by: Chandni Singh <singh.chandni@gmail.com>
Co-authored-by: Min Shen <victor.nju@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
wangyum pushed a commit that referenced this pull request May 26, 2023
…ush operations

This is a follow-up to #29855 according to the [comments](https://github.com/apache/spark/pull/29855/files#r505536514)
In this PR, the following changes are made:

1. A new `BlockPushingListener` interface is created specifically for block push. The existing `BlockFetchingListener` interface is left as is, since it might be used by external shuffle solutions. These 2 interfaces are unified under `BlockTransferListener` to enable code reuse.
2. `RetryingBlockFetcher`, `BlockFetchStarter`, and `RetryingBlockFetchListener` are renamed to `RetryingBlockTransferor`, `BlockTransferStarter`, and `RetryingBlockTransferListener` respectively. This makes their names more generic to be reused across both block fetch and push.
3. Comments in `OneForOneBlockPusher` are further clarified to better explain how we handle retries for block push.

To make code cleaner without sacrificing backward compatibility.

No

Existing unit tests.

Closes #33340 from Victsm/SPARK-32915-followup.

Lead-authored-by: Min Shen <mshen@linkedin.com>
Co-authored-by: Min Shen <victor.nju@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.