Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32923][CORE][SHUFFLE] Handle indeterminate stage retries for push-based shuffle #33034

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
54c80b7
SPARK-32923 initial incomplete changes
venkata91 Jun 18, 2021
1dd1c8a
fix RemotePushBlockResolver
venkata91 Jul 14, 2021
18ad183
Major changes to ShuffleBlockFetcherIteratorSUite
venkata91 Jul 15, 2021
0343cbd
testing changes
venkata91 Jul 15, 2021
afbbf50
additional changes
venkata91 Jul 17, 2021
4bbc4c7
Added tests in DAGSchedulerSuite
venkata91 Jul 20, 2021
6f7272c
some cleanups
venkata91 Jul 20, 2021
ede3103
Changes to OneForOneBlockFetcher and OneForOneBlockFetcherSuite
venkata91 Jul 21, 2021
23e441b
Change ShufflePushBlockId to ShuffleMergedBlockId
venkata91 Jul 21, 2021
7d42827
refactor OneForOneBlockFetcher
venkata91 Jul 21, 2021
feb18bd
additional tests
venkata91 Jul 22, 2021
fb9fd30
Address ngone51 review comments
venkata91 Jul 22, 2021
1f00794
fix flaky test
venkata91 Jul 23, 2021
ebd8c89
rename shuffleSequenceId to shuffleMergeId
venkata91 Jul 23, 2021
7032f45
minor fixes and added tests to BlockIdSuite
venkata91 Jul 25, 2021
c3dd02e
Address review comments from mridulm
venkata91 Jul 25, 2021
2de94d2
changes to RemoteBlockPushResolver
venkata91 Jul 25, 2021
82eebe6
fix style check issues in java
venkata91 Jul 26, 2021
1c2e869
Address review comments from mridulm
venkata91 Jul 27, 2021
d3f63ed
Address some more review comments
venkata91 Jul 27, 2021
d015b51
Address ngone51 review comments
venkata91 Jul 27, 2021
3412af1
refactor RemoteBlockPushResolver
venkata91 Jul 28, 2021
0c57037
changes to tests and add fetch error handler
venkata91 Jul 28, 2021
a8510b8
Add a custom exception for stale block push
venkata91 Jul 28, 2021
2e3416b
Address mridulm review comments
venkata91 Jul 28, 2021
62ca9ae
Address remaining review comments
venkata91 Jul 28, 2021
683f98d
Address review comments
venkata91 Jul 29, 2021
4a43d1d
Address mridulm, ngone51, zhouyejoe review comments
venkata91 Jul 30, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,15 @@ public long sendRpc(ByteBuffer message, RpcResponseCallback callback) {
*
* @param appId applicationId.
* @param shuffleId shuffle id.
* @param shuffleMergeId shuffleMergeId is used to uniquely identify merging process
* of shuffle by an indeterminate stage attempt.
* @param reduceId reduce id.
* @param callback callback the handle the reply.
*/
public void sendMergedBlockMetaReq(
String appId,
int shuffleId,
int shuffleMergeId,
int reduceId,
MergedBlockMetaResponseCallback callback) {
long requestId = requestId();
Expand All @@ -222,7 +225,8 @@ public void sendMergedBlockMetaReq(
handler.addRpcRequest(requestId, callback);
RpcChannelListener listener = new RpcChannelListener(requestId, callback);
channel.writeAndFlush(
new MergedBlockMetaRequest(requestId, appId, shuffleId, reduceId)).addListener(listener);
new MergedBlockMetaRequest(requestId, appId, shuffleId, shuffleMergeId,
reduceId)).addListener(listener);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,20 @@ public class MergedBlockMetaRequest extends AbstractMessage implements RequestMe
public final long requestId;
public final String appId;
public final int shuffleId;
public final int shuffleMergeId;
public final int reduceId;

public MergedBlockMetaRequest(long requestId, String appId, int shuffleId, int reduceId) {
public MergedBlockMetaRequest(
long requestId,
String appId,
int shuffleId,
int shuffleMergeId,
int reduceId) {
super(null, false);
this.requestId = requestId;
this.appId = appId;
this.shuffleId = shuffleId;
this.shuffleMergeId = shuffleMergeId;
this.reduceId = reduceId;
}

Expand All @@ -49,36 +56,39 @@ public Type type() {

@Override
public int encodedLength() {
return 8 + Encoders.Strings.encodedLength(appId) + 4 + 4;
return 8 + Encoders.Strings.encodedLength(appId) + 4 + 4 + 4;
}

@Override
public void encode(ByteBuf buf) {
buf.writeLong(requestId);
Encoders.Strings.encode(buf, appId);
buf.writeInt(shuffleId);
buf.writeInt(shuffleMergeId);
buf.writeInt(reduceId);
}

public static MergedBlockMetaRequest decode(ByteBuf buf) {
long requestId = buf.readLong();
String appId = Encoders.Strings.decode(buf);
int shuffleId = buf.readInt();
int shuffleMergeId = buf.readInt();
int reduceId = buf.readInt();
return new MergedBlockMetaRequest(requestId, appId, shuffleId, reduceId);
return new MergedBlockMetaRequest(requestId, appId, shuffleId, shuffleMergeId, reduceId);
}

@Override
public int hashCode() {
return Objects.hashCode(requestId, appId, shuffleId, reduceId);
return Objects.hashCode(requestId, appId, shuffleId, shuffleMergeId, reduceId);
}

@Override
public boolean equals(Object other) {
if (other instanceof MergedBlockMetaRequest) {
MergedBlockMetaRequest o = (MergedBlockMetaRequest) other;
return requestId == o.requestId && shuffleId == o.shuffleId && reduceId == o.reduceId
&& Objects.equal(appId, o.appId);
return requestId == o.requestId && shuffleId == o.shuffleId &&
shuffleMergeId == o.shuffleMergeId && reduceId == o.reduceId &&
Objects.equal(appId, o.appId);
}
return false;
}
Expand All @@ -89,6 +99,7 @@ public String toString() {
.append("requestId", requestId)
.append("appId", appId)
.append("shuffleId", shuffleId)
.append("shuffleMergeId", shuffleMergeId)
.append("reduceId", reduceId)
.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,14 @@ public MergedBlockMetaReqHandler getMergedBlockMetaReqHandler() {
TransportClient reverseClient = mock(TransportClient.class);
TransportRequestHandler requestHandler = new TransportRequestHandler(channel, reverseClient,
rpcHandler, 2L, null);
MergedBlockMetaRequest validMetaReq = new MergedBlockMetaRequest(19, "app1", 0, 0);
MergedBlockMetaRequest validMetaReq = new MergedBlockMetaRequest(19, "app1", 0, 0, 0);
requestHandler.handle(validMetaReq);
assertEquals(1, responseAndPromisePairs.size());
assertTrue(responseAndPromisePairs.get(0).getLeft() instanceof MergedBlockMetaSuccess);
assertEquals(2,
((MergedBlockMetaSuccess) (responseAndPromisePairs.get(0).getLeft())).getNumChunks());

MergedBlockMetaRequest invalidMetaReq = new MergedBlockMetaRequest(21, "app1", -1, 1);
MergedBlockMetaRequest invalidMetaReq = new MergedBlockMetaRequest(21, "app1", -1, 0, 1);
requestHandler.handle(invalidMetaReq);
assertEquals(2, responseAndPromisePairs.size());
assertTrue(responseAndPromisePairs.get(1).getLeft() instanceof RpcFailure);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ public void pushBlocks(
* @param host host of shuffle server
* @param port port of shuffle server.
* @param shuffleId shuffle ID of the shuffle to be finalized
* @param shuffleMergeId shuffleMergeId is used to uniquely identify merging process
* of shuffle by an indeterminate stage attempt.
* @param listener the listener to receive MergeStatuses
*
* @since 3.1.0
Expand All @@ -175,6 +177,7 @@ public void finalizeShuffleMerge(
String host,
int port,
int shuffleId,
int shuffleMergeId,
MergeFinalizerListener listener) {
throw new UnsupportedOperationException();
}
Expand All @@ -185,6 +188,8 @@ public void finalizeShuffleMerge(
* @param host the host of the remote node.
* @param port the port of the remote node.
* @param shuffleId shuffle id.
* @param shuffleMergeId shuffleMergeId is used to uniquely identify merging process
* of shuffle by an indeterminate stage attempt.
* @param reduceId reduce id.
* @param listener the listener to receive chunk counts.
*
Expand All @@ -194,6 +199,7 @@ public void getMergedBlockMeta(
String host,
int port,
int shuffleId,
int shuffleMergeId,
int reduceId,
MergedBlocksMetaListener listener) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,14 @@ default boolean shouldLogError(Throwable t) {
class BlockPushErrorHandler implements ErrorHandler {
/**
* String constant used for generating exception messages indicating a block to be merged
* arrives too late on the server side, and also for later checking such exceptions on the
* client side. When we get a block push failure because of the block arrives too late, we
* will not retry pushing the block nor log the exception on the client side.
* arrives too late or stale block push in the case of indeterminate stage retries on the
* server side, and also for later checking such exceptions on the client side. When we get
* a block push failure because of the block push being stale or arrives too late, we will
* not retry pushing the block nor log the exception on the client side.
*/
public static final String TOO_LATE_MESSAGE_SUFFIX =
"received after merged shuffle is finalized";
public static final String TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX =
"received after merged shuffle is finalized or stale block push as shuffle blocks of a"
+ " higher shuffleMergeId for the shuffle is being pushed";

/**
* String constant used for generating exception messages indicating the server couldn't
Expand All @@ -81,25 +83,54 @@ class BlockPushErrorHandler implements ErrorHandler {
public static final String IOEXCEPTIONS_EXCEEDED_THRESHOLD_PREFIX =
"IOExceptions exceeded the threshold";

/**
* String constant used for generating exception messages indicating the server rejecting a
* shuffle finalize request since shuffle blocks of a higher shuffleMergeId for a shuffle is
* already being pushed. This typically happens in the case of indeterminate stage retries
* where if a stage attempt fails then the entirety of the shuffle output needs to be rolled
* back. For more details refer SPARK-23243, SPARK-25341 and SPARK-32923.
*/
public static final String STALE_SHUFFLE_FINALIZE_SUFFIX =
"stale shuffle finalize request as shuffle blocks of a higher shuffleMergeId for the"
+ " shuffle is already being pushed";

@Override
public boolean shouldRetryError(Throwable t) {
// If it is a connection time-out or a connection closed exception, no need to retry.
// If it is a FileNotFoundException originating from the client while pushing the shuffle
// blocks to the server, even then there is no need to retry. We will still log this exception
// once which helps with debugging.
// blocks to the server, even then there is no need to retry. We will still log this
// exception once which helps with debugging.
if (t.getCause() != null && (t.getCause() instanceof ConnectException ||
t.getCause() instanceof FileNotFoundException)) {
return false;
}
// If the block is too late, there is no need to retry it
return !Throwables.getStackTraceAsString(t).contains(TOO_LATE_MESSAGE_SUFFIX);

String errorStackTrace = Throwables.getStackTraceAsString(t);
// If the block is too late or stale block push, there is no need to retry it
return !errorStackTrace.contains(TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX);
}

@Override
public boolean shouldLogError(Throwable t) {
String errorStackTrace = Throwables.getStackTraceAsString(t);
return !errorStackTrace.contains(BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX) &&
!errorStackTrace.contains(TOO_LATE_MESSAGE_SUFFIX);
return !(errorStackTrace.contains(BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX) ||
errorStackTrace.contains(TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX));
}
}

class BlockFetchErrorHandler implements ErrorHandler {
public static final String STALE_SHUFFLE_BLOCK_FETCH =
"stale shuffle block fetch request as shuffle blocks of a higher shuffleMergeId for the"
+ " shuffle is available";

@Override
public boolean shouldRetryError(Throwable t) {
return !Throwables.getStackTraceAsString(t).contains(STALE_SHUFFLE_BLOCK_FETCH);
}

@Override
public boolean shouldLogError(Throwable t) {
return !Throwables.getStackTraceAsString(t).contains(STALE_SHUFFLE_BLOCK_FETCH);
}
}
}
Loading