Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32915][SHUFFLE] Create and rename classes in shuffle RPC used for block push operations #30513

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@

package org.apache.spark.network.shuffle;

import java.util.EventListener;

import org.apache.spark.network.buffer.ManagedBuffer;

public interface BlockFetchingListener extends EventListener {
public interface BlockFetchingListener extends BlockTransferListener {
/**
* Called once per successfully fetched block. After this call returns, data will be released
* automatically. If the data will be passed to another thread, the receiver should retain()
Expand All @@ -33,4 +31,14 @@ public interface BlockFetchingListener extends EventListener {
* Called at least once per block upon failures.
*/
void onBlockFetchFailure(String blockId, Throwable exception);

@Override
default void onBlockTransferSuccess(String blockId, ManagedBuffer data) {
onBlockFetchSuccess(blockId, data);
}

@Override
default void onBlockTransferFailure(String blockId, Throwable exception) {
onBlockFetchFailure(blockId, exception);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.shuffle;

import org.apache.spark.network.buffer.ManagedBuffer;

public interface BlockPushingListener extends BlockTransferListener {
/**
* Called once per successfully pushed block. After this call returns, data will be released
* automatically. If the data will be passed to another thread, the receiver should retain()
* and release() the buffer on their own, or copy the data to a new buffer.
*/
void onBlockPushSuccess(String blockId, ManagedBuffer data);

/**
* Called at least once per block upon failures.
*/
void onBlockPushFailure(String blockId, Throwable exception);

@Override
default void onBlockTransferSuccess(String blockId, ManagedBuffer data) {
onBlockPushSuccess(blockId, data);
}

@Override
default void onBlockTransferFailure(String blockId, Throwable exception) {
onBlockPushFailure(blockId, exception);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void pushBlocks(
int port,
String[] blockIds,
ManagedBuffer[] buffers,
BlockFetchingListener listener) {
BlockPushingListener listener) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.shuffle;

import java.util.EventListener;

import org.apache.spark.network.buffer.ManagedBuffer;

/**
* This interface unifies both {@link BlockFetchingListener} and {@link BlockPushingListener}
* under a single interface to allow code reuse, while also keeping the existing public interface
* to facilitate backward compatibility.
*/
public interface BlockTransferListener extends EventListener {
/**
* Called once per successfully transferred block.
*/
void onBlockTransferSuccess(String blockId, ManagedBuffer data);

/**
* Called at least once per block transfer failures.
*/
void onBlockTransferFailure(String blockId, Throwable exception);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import org.apache.spark.annotation.Evolving;

/**
* Plugs into {@link RetryingBlockFetcher} to further control when an exception should be retried
* Plugs into {@link RetryingBlockTransferor} to further control when an exception should be retried
* and logged.
* Note: {@link RetryingBlockFetcher} will delegate the exception to this handler only when
* Note: {@link RetryingBlockTransferor} will delegate the exception to this handler only when
* - remaining retries < max retries
* - exception is an IOException
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,13 @@ public void fetchBlocks(
logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId);
try {
int maxRetries = conf.maxIORetries();
RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
RetryingBlockTransferor.BlockTransferStarter blockFetchStarter =
(inputBlockId, inputListener) -> {
// Unless this client is closed.
if (clientFactory != null) {
TransportClient client = clientFactory.createClient(host, port, maxRetries > 0);
new OneForOneBlockFetcher(client, appId, execId,
inputBlockId, inputListener, conf, downloadFileManager).start();
new OneForOneBlockFetcher(client, appId, execId, inputBlockId,
(BlockFetchingListener) inputListener, conf, downloadFileManager).start();
} else {
logger.info("This clientFactory was closed. Skipping further block fetch retries.");
}
Expand All @@ -109,7 +109,7 @@ public void fetchBlocks(
if (maxRetries > 0) {
// Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's
// a bug in this code. We should remove the if statement once we're sure of the stability.
new RetryingBlockFetcher(conf, blockFetchStarter, blockIds, listener).start();
new RetryingBlockTransferor(conf, blockFetchStarter, blockIds, listener).start();
} else {
blockFetchStarter.createAndStart(blockIds, listener);
}
Expand All @@ -127,7 +127,7 @@ public void pushBlocks(
int port,
String[] blockIds,
ManagedBuffer[] buffers,
BlockFetchingListener listener) {
BlockPushingListener listener) {
checkInit();
assert blockIds.length == buffers.length : "Number of block ids and buffers do not match.";

Expand All @@ -137,23 +137,23 @@ public void pushBlocks(
}
logger.debug("Push {} shuffle blocks to {}:{}", blockIds.length, host, port);
try {
RetryingBlockFetcher.BlockFetchStarter blockPushStarter =
RetryingBlockTransferor.BlockTransferStarter blockPushStarter =
(inputBlockId, inputListener) -> {
TransportClient client = clientFactory.createClient(host, port);
new OneForOneBlockPusher(client, appId, inputBlockId, inputListener, buffersWithId)
.start();
new OneForOneBlockPusher(client, appId, inputBlockId,
(BlockPushingListener) inputListener, buffersWithId).start();
};
int maxRetries = conf.maxIORetries();
if (maxRetries > 0) {
new RetryingBlockFetcher(
new RetryingBlockTransferor(
conf, blockPushStarter, blockIds, listener, PUSH_ERROR_HANDLER).start();
} else {
blockPushStarter.createAndStart(blockIds, listener);
}
} catch (Exception e) {
logger.error("Exception while beginning pushBlocks", e);
for (String blockId : blockIds) {
listener.onBlockFetchFailure(blockId, e);
listener.onBlockPushFailure(blockId, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ public class OneForOneBlockPusher {
private final TransportClient client;
private final String appId;
private final String[] blockIds;
private final BlockFetchingListener listener;
private final BlockPushingListener listener;
private final Map<String, ManagedBuffer> buffers;

public OneForOneBlockPusher(
TransportClient client,
String appId,
String[] blockIds,
BlockFetchingListener listener,
BlockPushingListener listener,
Map<String, ManagedBuffer> buffers) {
this.client = client;
this.appId = appId;
Expand All @@ -75,21 +75,34 @@ private class BlockPushCallback implements RpcResponseCallback {
@Override
public void onSuccess(ByteBuffer response) {
// On receipt of a successful block push
listener.onBlockFetchSuccess(blockId, new NioManagedBuffer(ByteBuffer.allocate(0)));
listener.onBlockPushSuccess(blockId, new NioManagedBuffer(ByteBuffer.allocate(0)));
}

@Override
public void onFailure(Throwable e) {
// Since block push is best effort, i.e., if we encountered a block push failure that's not
// retriable or exceeding the max retires, we should not fail all remaining block pushes.
// The best effort nature makes block push tolerable of a partial completion. Thus, we only
// fail the block that's actually failed. Not that, on the RetryingBlockFetcher side, once
// retry is initiated, it would still invalidate the previous active retry listener, and
// retry all outstanding blocks. We are preventing forwarding unnecessary block push failures
// to the parent listener of the retry listener. The only exceptions would be if the block
// push failure is due to block arriving on the server side after merge finalization, or the
// client fails to establish connection to the server side. In both cases, we would fail all
// remaining blocks.
// Since block push is best effort, i.e., if we encounter a block push failure that's still
// retriable according to ErrorHandler (not a connection exception and the block is not too
// late), we should not fail all remaining block pushes even though
// RetryingBlockTransferor might consider this failure not retriable (exceeding max retry
// count etc). The best effort nature makes block push tolerable of a partial completion.
// Thus, we only fail the block that's actually failed in this case. Note that, on the
// RetryingBlockTransferor side, if retry is initiated, it would still invalidate the
// previous active retry listener, and retry pushing all outstanding blocks. However, since
// the blocks to be pushed are preloaded into memory and the first attempt of pushing these
// blocks might have already succeeded, retry pushing all the outstanding blocks should be
// very cheap (on the client side, the block data is in memory; on the server side, the block
// will be recognized as a duplicate which triggers noop handling). Here, by failing only the
// one block that's actually failed, we are essentially preventing forwarding unnecessary
// block push failures to the parent listener of the retry listener.
//
// Take the following as an example. For the common exception during block push handling,
// i.e. block collision, it is considered as retriable by ErrorHandler but not retriable
// by RetryingBlockTransferor. When we encounter a failure of this type, we only fail the
// one block encountering this issue not the remaining blocks in the same batch. On the
// RetryingBlockTransferor side, since this exception is considered as not retriable, it
// would immediately invoke parent listener's onBlockTransferFailure. However, the remaining
// blocks in the same batch would remain current and active and they won't be impacted by
// this exception.
if (PUSH_ERROR_HANDLER.shouldRetryError(e)) {
String[] targetBlockId = Arrays.copyOfRange(blockIds, index, index + 1);
failRemainingBlocks(targetBlockId, e);
Expand All @@ -103,7 +116,7 @@ public void onFailure(Throwable e) {
private void failRemainingBlocks(String[] failedBlockIds, Throwable e) {
for (String blockId : failedBlockIds) {
try {
listener.onBlockFetchFailure(blockId, e);
listener.onBlockPushFailure(blockId, e);
} catch (Exception e2) {
logger.error("Error in block push failure callback", e2);
}
Expand Down
Loading