Skip to content

Commit

Permalink
[SPARK-36266][SHUFFLE] Rename classes in shuffle RPC used for block p…
Browse files Browse the repository at this point in the history
…ush operations

### What changes were proposed in this pull request?
This is a follow-up to #29855 according to the [comments](https://github.com/apache/spark/pull/29855/files#r505536514)
In this PR, the following changes are made:

1. A new `BlockPushingListener` interface is created specifically for block push. The existing `BlockFetchingListener` interface is left as is, since it might be used by external shuffle solutions. These 2 interfaces are unified under `BlockTransferListener` to enable code reuse.
2. `RetryingBlockFetcher`, `BlockFetchStarter`, and `RetryingBlockFetchListener` are renamed to `RetryingBlockTransferor`, `BlockTransferStarter`, and `RetryingBlockTransferListener` respectively. This makes their names more generic to be reused across both block fetch and push.
3. Comments in `OneForOneBlockPusher` are further clarified to better explain how we handle retries for block push.

### Why are the changes needed?
To make code cleaner without sacrificing backward compatibility.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing unit tests.

Closes #33340 from Victsm/SPARK-32915-followup.

Lead-authored-by: Min Shen <mshen@linkedin.com>
Co-authored-by: Min Shen <victor.nju@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit c4aa54e)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
  • Loading branch information
2 people authored and Mridul Muralidharan committed Jul 26, 2021
1 parent ae7b32a commit bbec381
Show file tree
Hide file tree
Showing 14 changed files with 347 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@

package org.apache.spark.network.shuffle;

import java.util.EventListener;

import org.apache.spark.network.buffer.ManagedBuffer;

public interface BlockFetchingListener extends EventListener {
public interface BlockFetchingListener extends BlockTransferListener {
/**
* Called once per successfully fetched block. After this call returns, data will be released
* automatically. If the data will be passed to another thread, the receiver should retain()
Expand All @@ -33,4 +31,19 @@ public interface BlockFetchingListener extends EventListener {
* Called at least once per block upon failures.
*/
void onBlockFetchFailure(String blockId, Throwable exception);

@Override
default void onBlockTransferSuccess(String blockId, ManagedBuffer data) {
onBlockFetchSuccess(blockId, data);
}

@Override
default void onBlockTransferFailure(String blockId, Throwable exception) {
onBlockFetchFailure(blockId, exception);
}

@Override
default String getTransferType() {
return "fetch";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.shuffle;

import org.apache.spark.network.buffer.ManagedBuffer;

/**
* Callback to handle block push success and failure. This interface and
* {@link BlockFetchingListener} are unified under {@link BlockTransferListener} to allow
* code reuse for handling block push and fetch retry.
*/
public interface BlockPushingListener extends BlockTransferListener {
/**
* Called once per successfully pushed block. After this call returns, data will be released
* automatically. If the data will be passed to another thread, the receiver should retain()
* and release() the buffer on their own, or copy the data to a new buffer.
*/
void onBlockPushSuccess(String blockId, ManagedBuffer data);

/**
* Called at least once per block upon failures.
*/
void onBlockPushFailure(String blockId, Throwable exception);

@Override
default void onBlockTransferSuccess(String blockId, ManagedBuffer data) {
onBlockPushSuccess(blockId, data);
}

@Override
default void onBlockTransferFailure(String blockId, Throwable exception) {
onBlockPushFailure(blockId, exception);
}

@Override
default String getTransferType() {
return "push";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void pushBlocks(
int port,
String[] blockIds,
ManagedBuffer[] buffers,
BlockFetchingListener listener) {
BlockPushingListener listener) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.shuffle;

import java.util.EventListener;

import org.apache.spark.network.buffer.ManagedBuffer;

/**
* This interface unifies both {@link BlockFetchingListener} and {@link BlockPushingListener}
* under a single interface to allow code reuse, while also keeping the existing public interface
* to facilitate backward compatibility.
*/
public interface BlockTransferListener extends EventListener {
/**
* Called once per successfully transferred block.
*/
void onBlockTransferSuccess(String blockId, ManagedBuffer data);

/**
* Called at least once per block transfer failures.
*/
void onBlockTransferFailure(String blockId, Throwable exception);

/**
* Return a string indicating the type of the listener such as fetch, push, or something else
*/
String getTransferType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import org.apache.spark.annotation.Evolving;

/**
* Plugs into {@link RetryingBlockFetcher} to further control when an exception should be retried
* Plugs into {@link RetryingBlockTransferor} to further control when an exception should be retried
* and logged.
* Note: {@link RetryingBlockFetcher} will delegate the exception to this handler only when
* Note: {@link RetryingBlockTransferor} will delegate the exception to this handler only when
* - remaining retries < max retries
* - exception is an IOException
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,15 @@ public void fetchBlocks(
logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId);
try {
int maxRetries = conf.maxIORetries();
RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
RetryingBlockTransferor.BlockTransferStarter blockFetchStarter =
(inputBlockId, inputListener) -> {
// Unless this client is closed.
if (clientFactory != null) {
assert inputListener instanceof BlockFetchingListener :
"Expecting a BlockFetchingListener, but got " + inputListener.getClass();
TransportClient client = clientFactory.createClient(host, port, maxRetries > 0);
new OneForOneBlockFetcher(client, appId, execId,
inputBlockId, inputListener, conf, downloadFileManager).start();
new OneForOneBlockFetcher(client, appId, execId, inputBlockId,
(BlockFetchingListener) inputListener, conf, downloadFileManager).start();
} else {
logger.info("This clientFactory was closed. Skipping further block fetch retries.");
}
Expand All @@ -110,7 +112,7 @@ public void fetchBlocks(
if (maxRetries > 0) {
// Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's
// a bug in this code. We should remove the if statement once we're sure of the stability.
new RetryingBlockFetcher(conf, blockFetchStarter, blockIds, listener).start();
new RetryingBlockTransferor(conf, blockFetchStarter, blockIds, listener).start();
} else {
blockFetchStarter.createAndStart(blockIds, listener);
}
Expand All @@ -128,7 +130,7 @@ public void pushBlocks(
int port,
String[] blockIds,
ManagedBuffer[] buffers,
BlockFetchingListener listener) {
BlockPushingListener listener) {
checkInit();
assert blockIds.length == buffers.length : "Number of block ids and buffers do not match.";

Expand All @@ -138,23 +140,29 @@ public void pushBlocks(
}
logger.debug("Push {} shuffle blocks to {}:{}", blockIds.length, host, port);
try {
RetryingBlockFetcher.BlockFetchStarter blockPushStarter =
RetryingBlockTransferor.BlockTransferStarter blockPushStarter =
(inputBlockId, inputListener) -> {
TransportClient client = clientFactory.createClient(host, port);
new OneForOneBlockPusher(client, appId, conf.appAttemptId(), inputBlockId,
inputListener, buffersWithId).start();
if (clientFactory != null) {
assert inputListener instanceof BlockPushingListener :
"Expecting a BlockPushingListener, but got " + inputListener.getClass();
TransportClient client = clientFactory.createClient(host, port);
new OneForOneBlockPusher(client, appId, conf.appAttemptId(), inputBlockId,
(BlockPushingListener) inputListener, buffersWithId).start();
} else {
logger.info("This clientFactory was closed. Skipping further block push retries.");
}
};
int maxRetries = conf.maxIORetries();
if (maxRetries > 0) {
new RetryingBlockFetcher(
new RetryingBlockTransferor(
conf, blockPushStarter, blockIds, listener, PUSH_ERROR_HANDLER).start();
} else {
blockPushStarter.createAndStart(blockIds, listener);
}
} catch (Exception e) {
logger.error("Exception while beginning pushBlocks", e);
for (String blockId : blockIds) {
listener.onBlockFetchFailure(blockId, e);
listener.onBlockPushFailure(blockId, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ public class OneForOneBlockPusher {
private final String appId;
private final int appAttemptId;
private final String[] blockIds;
private final BlockFetchingListener listener;
private final BlockPushingListener listener;
private final Map<String, ManagedBuffer> buffers;

public OneForOneBlockPusher(
TransportClient client,
String appId,
int appAttemptId,
String[] blockIds,
BlockFetchingListener listener,
BlockPushingListener listener,
Map<String, ManagedBuffer> buffers) {
this.client = client;
this.appId = appId;
Expand All @@ -78,21 +78,34 @@ private class BlockPushCallback implements RpcResponseCallback {
@Override
public void onSuccess(ByteBuffer response) {
// On receipt of a successful block push
listener.onBlockFetchSuccess(blockId, new NioManagedBuffer(ByteBuffer.allocate(0)));
listener.onBlockPushSuccess(blockId, new NioManagedBuffer(ByteBuffer.allocate(0)));
}

@Override
public void onFailure(Throwable e) {
// Since block push is best effort, i.e., if we encountered a block push failure that's not
// retriable or exceeding the max retires, we should not fail all remaining block pushes.
// The best effort nature makes block push tolerable of a partial completion. Thus, we only
// fail the block that's actually failed. Not that, on the RetryingBlockFetcher side, once
// retry is initiated, it would still invalidate the previous active retry listener, and
// retry all outstanding blocks. We are preventing forwarding unnecessary block push failures
// to the parent listener of the retry listener. The only exceptions would be if the block
// push failure is due to block arriving on the server side after merge finalization, or the
// client fails to establish connection to the server side. In both cases, we would fail all
// remaining blocks.
// Since block push is best effort, i.e., if we encounter a block push failure that's still
// retriable according to ErrorHandler (not a connection exception and the block is not too
// late), we should not fail all remaining block pushes even though
// RetryingBlockTransferor might consider this failure not retriable (exceeding max retry
// count etc). The best effort nature makes block push tolerable of a partial completion.
// Thus, we only fail the block that's actually failed in this case. Note that, on the
// RetryingBlockTransferor side, if retry is initiated, it would still invalidate the
// previous active retry listener, and retry pushing all outstanding blocks. However, since
// the blocks to be pushed are preloaded into memory and the first attempt of pushing these
// blocks might have already succeeded, retry pushing all the outstanding blocks should be
// very cheap (on the client side, the block data is in memory; on the server side, the block
// will be recognized as a duplicate which triggers noop handling). Here, by failing only the
// one block that's actually failed, we are essentially preventing forwarding unnecessary
// block push failures to the parent listener of the retry listener.
//
// Take the following as an example. For the common exception during block push handling,
// i.e. block collision, it is considered as retriable by ErrorHandler but not retriable
// by RetryingBlockTransferor. When we encounter a failure of this type, we only fail the
// one block encountering this issue not the remaining blocks in the same batch. On the
// RetryingBlockTransferor side, since this exception is considered as not retriable, it
// would immediately invoke parent listener's onBlockTransferFailure. However, the remaining
// blocks in the same batch would remain current and active and they won't be impacted by
// this exception.
if (PUSH_ERROR_HANDLER.shouldRetryError(e)) {
String[] targetBlockId = Arrays.copyOfRange(blockIds, index, index + 1);
failRemainingBlocks(targetBlockId, e);
Expand All @@ -106,7 +119,7 @@ public void onFailure(Throwable e) {
private void failRemainingBlocks(String[] failedBlockIds, Throwable e) {
for (String blockId : failedBlockIds) {
try {
listener.onBlockFetchFailure(blockId, e);
listener.onBlockPushFailure(blockId, e);
} catch (Exception e2) {
logger.error("Error in block push failure callback", e2);
}
Expand Down
Loading

0 comments on commit bbec381

Please sign in to comment.