-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-36266][SHUFFLE] Rename classes in shuffle RPC used for block push operations #33340
Conversation
… push. Add BlockPushingListener for block push.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took a quick pass.
Will wait for @Ngone51, @attilapiros, @dongjoon-hyun, @tgravescs, @jiangxb1987 and @otterc to also take a pass since they all reviewed the original PR.
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockPushingListener.java
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
Show resolved
Hide resolved
Test failure is unrelated to this change. |
I haven't taken a deep look into code but just wondering according to the PR description. I know some code was added in 3.1 but push-based wasn't usable at that time. So, do we have to handle the compatibility issue? I also noticed a similar issue (see #33078 (comment)) in another PR and @mridulm told me these kinds of cases don't have to be a compatibility issue. |
I see why the adapter pattern is used here over simple renames. You also mentioned the reason: "without sacrificing backward compatibility" but could you explain the reason a little bit more in the description to have this information for the future reference. |
@attilapiros you are right. The backward compatibility is meant to keep the existing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just have few comments.
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockPushingListener.java
Show resolved
Hide resolved
...network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
Show resolved
Hide resolved
...network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
Outdated
Show resolved
Hide resolved
...network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
Outdated
Show resolved
Hide resolved
BTW, I think it's better to use a separate JIRA id for this PR instead of a follow-up (which usually has less change). |
Created a separate ticket for this change under SPARK-36266 and addressed the additional comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. Just a nit. Thanks for working on this @Victsm
@@ -25,6 +25,8 @@ | |||
* code reuse for handling block push and fetch retry. | |||
*/ | |||
public interface BlockPushingListener extends BlockTransferListener { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Are we keeping BlockPushingListener
? My suggestion was to call it BlockPushListener
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think BlockPushingListener
is more consistent with `BlockFetchingListener.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, but BlockPushListener
sounds better and is consistent with BlockTransferListener
. Not a big deal though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BlockPushingListener
is supposed to be a peer to BlockFetchingListener
. Their names being consistent feels more appropriate.
* particular, the listener will be invoked exactly once per blockId, with a success or failure. | ||
*/ | ||
public class RetryingBlockFetcher { | ||
public class RetryingBlockTransferor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This's also public, although I have no idea how end-users could use it directly.
Shall we mark it as private from now? cc @mridulm
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
Outdated
Show resolved
Hide resolved
LGTM if tests pass. |
The tests seem to have been abnormally disrupted based on what I can see from the log. |
ok to test |
Kubernetes integration test unable to build dist. exiting with code: 1 |
Test build #141598 has finished for PR 33340 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Victsm I suspect the merged code which is used for running the unittests against introduces a build error:
[error] /home/jenkins/workspace/SparkPullRequestBuilder/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala:335:76: not found: type BlockFetchingListener
[error] val blockFetchListener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
[error] ^
[error] one error found
Could you please rebase/merge this on/with top of the master and check the ShuffleBlockPusherSuite.scala
after the rebase/merge?
Now I am sure. Line 335 in master: spark/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala Line 335 in 70a1586
|
...on/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockFetchingListener.java
Outdated
Show resolved
Hide resolved
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockPushingListener.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Conditional LGTM: some nits (not blocking) but it must be rebased/merged onto/with master and the test should pass.
Thanks for the reviews @Ngone51, @attilapiros, @otterc ! |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #141618 has finished for PR 33340 at commit
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, pending tests
Test build #141652 has finished for PR 33340 at commit
|
…ush operations ### What changes were proposed in this pull request? This is a follow-up to #29855 according to the [comments](https://github.com/apache/spark/pull/29855/files#r505536514) In this PR, the following changes are made: 1. A new `BlockPushingListener` interface is created specifically for block push. The existing `BlockFetchingListener` interface is left as is, since it might be used by external shuffle solutions. These 2 interfaces are unified under `BlockTransferListener` to enable code reuse. 2. `RetryingBlockFetcher`, `BlockFetchStarter`, and `RetryingBlockFetchListener` are renamed to `RetryingBlockTransferor`, `BlockTransferStarter`, and `RetryingBlockTransferListener` respectively. This makes their names more generic to be reused across both block fetch and push. 3. Comments in `OneForOneBlockPusher` are further clarified to better explain how we handle retries for block push. ### Why are the changes needed? To make code cleaner without sacrificing backward compatibility. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests. Closes #33340 from Victsm/SPARK-32915-followup. Lead-authored-by: Min Shen <mshen@linkedin.com> Co-authored-by: Min Shen <victor.nju@gmail.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit c4aa54e) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
Merged to master and branch-3.2, +CC @gengliangwang |
…ush operations This is a follow-up to apache#29855 according to the [comments](https://github.com/apache/spark/pull/29855/files#r505536514) In this PR, the following changes are made: 1. A new `BlockPushingListener` interface is created specifically for block push. The existing `BlockFetchingListener` interface is left as is, since it might be used by external shuffle solutions. These 2 interfaces are unified under `BlockTransferListener` to enable code reuse. 2. `RetryingBlockFetcher`, `BlockFetchStarter`, and `RetryingBlockFetchListener` are renamed to `RetryingBlockTransferor`, `BlockTransferStarter`, and `RetryingBlockTransferListener` respectively. This makes their names more generic to be reused across both block fetch and push. 3. Comments in `OneForOneBlockPusher` are further clarified to better explain how we handle retries for block push. To make code cleaner without sacrificing backward compatibility. No Existing unit tests. Closes apache#33340 from Victsm/SPARK-32915-followup. Lead-authored-by: Min Shen <mshen@linkedin.com> Co-authored-by: Min Shen <victor.nju@gmail.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit c4aa54e)
…ush operations ### What changes were proposed in this pull request? This is a follow-up to apache#29855 according to the [comments](https://github.com/apache/spark/pull/29855/files#r505536514) In this PR, the following changes are made: 1. A new `BlockPushingListener` interface is created specifically for block push. The existing `BlockFetchingListener` interface is left as is, since it might be used by external shuffle solutions. These 2 interfaces are unified under `BlockTransferListener` to enable code reuse. 2. `RetryingBlockFetcher`, `BlockFetchStarter`, and `RetryingBlockFetchListener` are renamed to `RetryingBlockTransferor`, `BlockTransferStarter`, and `RetryingBlockTransferListener` respectively. This makes their names more generic to be reused across both block fetch and push. 3. Comments in `OneForOneBlockPusher` are further clarified to better explain how we handle retries for block push. ### Why are the changes needed? To make code cleaner without sacrificing backward compatibility. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests. Closes apache#33340 from Victsm/SPARK-32915-followup. Lead-authored-by: Min Shen <mshen@linkedin.com> Co-authored-by: Min Shen <victor.nju@gmail.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit c4aa54e) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
…ush operations This is a follow-up to #29855 according to the [comments](https://github.com/apache/spark/pull/29855/files#r505536514) In this PR, the following changes are made: 1. A new `BlockPushingListener` interface is created specifically for block push. The existing `BlockFetchingListener` interface is left as is, since it might be used by external shuffle solutions. These 2 interfaces are unified under `BlockTransferListener` to enable code reuse. 2. `RetryingBlockFetcher`, `BlockFetchStarter`, and `RetryingBlockFetchListener` are renamed to `RetryingBlockTransferor`, `BlockTransferStarter`, and `RetryingBlockTransferListener` respectively. This makes their names more generic to be reused across both block fetch and push. 3. Comments in `OneForOneBlockPusher` are further clarified to better explain how we handle retries for block push. To make code cleaner without sacrificing backward compatibility. No Existing unit tests. Closes #33340 from Victsm/SPARK-32915-followup. Lead-authored-by: Min Shen <mshen@linkedin.com> Co-authored-by: Min Shen <victor.nju@gmail.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
What changes were proposed in this pull request?
This is a follow-up to #29855 according to the comments
In this PR, the following changes are made:
BlockPushingListener
interface is created specifically for block push. The existingBlockFetchingListener
interface is left as is, since it might be used by external shuffle solutions. These 2 interfaces are unified underBlockTransferListener
to enable code reuse.RetryingBlockFetcher
,BlockFetchStarter
, andRetryingBlockFetchListener
are renamed toRetryingBlockTransferor
,BlockTransferStarter
, andRetryingBlockTransferListener
respectively. This makes their names more generic to be reused across both block fetch and push.OneForOneBlockPusher
are further clarified to better explain how we handle retries for block push.Why are the changes needed?
To make code cleaner without sacrificing backward compatibility.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Existing unit tests.