Skip to content

Commit

Permalink
[SPARK-35546][SHUFFLE] Enable push-based shuffle when multiple app at…
Browse files Browse the repository at this point in the history
…tempts are enabled and manage concurrent access to the state in a better way

This is one of the patches for SPIP SPARK-30602 which is needed for push-based shuffle.

When Executor registers with Shuffle Service, it will encode the merged shuffle dir created and also the application attemptId into the ShuffleManagerMeta into Json. Then in Shuffle Service, it will decode the Json string and get the correct merged shuffle dir and also the attemptId. If the registration comes from a newer attempt, the merged shuffle information will be updated to store the information from the newer attempt.

This PR also refactored the management of the merged shuffle information to avoid concurrency issues.
Refer to the SPIP in SPARK-30602.

No.

Added unit tests.
The reference PR with the consolidated changes covering the complete implementation is also provided in SPARK-30602.
We have already verified the functionality and the improved performance as documented in the SPIP doc.

Closes apache#33078 from zhouyejoe/SPARK-35546.

Authored-by: Ye Zhou <yezhou@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit c77acf0)
  • Loading branch information
zhouyejoe committed Aug 2, 2021
1 parent be3f5f8 commit b5a5594
Show file tree
Hide file tree
Showing 16 changed files with 810 additions and 375 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -419,4 +419,11 @@ public long mergedIndexCacheSize() {
public int ioExceptionsThresholdDuringMerge() {
return conf.getInt("spark.shuffle.server.ioExceptionsThresholdDuringMerge", 4);
}

/**
* The application attemptID assigned from Hadoop YARN.
*/
public int appAttemptId() {
return conf.getInt("spark.app.attempt.id", -1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ public void pushBlocks(
RetryingBlockFetcher.BlockFetchStarter blockPushStarter =
(inputBlockId, inputListener) -> {
TransportClient client = clientFactory.createClient(host, port);
new OneForOneBlockPusher(client, appId, inputBlockId, inputListener, buffersWithId)
.start();
new OneForOneBlockPusher(client, appId, conf.appAttemptId(), inputBlockId,
inputListener, buffersWithId).start();
};
int maxRetries = conf.maxIORetries();
if (maxRetries > 0) {
Expand All @@ -168,7 +168,8 @@ public void finalizeShuffleMerge(
checkInit();
try {
TransportClient client = clientFactory.createClient(host, port);
ByteBuffer finalizeShuffleMerge = new FinalizeShuffleMerge(appId, shuffleId).toByteBuffer();
ByteBuffer finalizeShuffleMerge =
new FinalizeShuffleMerge(appId, conf.appAttemptId(), shuffleId).toByteBuffer();
client.sendRpc(finalizeShuffleMerge, new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,21 @@ public class OneForOneBlockPusher {

private final TransportClient client;
private final String appId;
private final int appAttemptId;
private final String[] blockIds;
private final BlockFetchingListener listener;
private final Map<String, ManagedBuffer> buffers;

public OneForOneBlockPusher(
TransportClient client,
String appId,
int appAttemptId,
String[] blockIds,
BlockFetchingListener listener,
Map<String, ManagedBuffer> buffers) {
this.client = client;
this.appId = appId;
this.appAttemptId = appAttemptId;
this.blockIds = blockIds;
this.listener = listener;
this.buffers = buffers;
Expand Down Expand Up @@ -123,8 +126,9 @@ public void start() {
throw new IllegalArgumentException(
"Unexpected shuffle push block id format: " + blockIds[i]);
}
ByteBuffer header = new PushBlockStream(appId, Integer.parseInt(blockIdParts[1]),
Integer.parseInt(blockIdParts[2]), Integer.parseInt(blockIdParts[3]) , i).toByteBuffer();
ByteBuffer header =
new PushBlockStream(appId, appAttemptId, Integer.parseInt(blockIdParts[1]),
Integer.parseInt(blockIdParts[2]), Integer.parseInt(blockIdParts[3]) , i).toByteBuffer();
client.uploadStream(new NioManagedBuffer(header), buffers.get(blockIds[i]),
new BlockPushCallback(i, blockIds[i]));
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,12 @@ public class ExecutorShuffleInfo implements Encodable {
public final String[] localDirs;
/** Number of subdirectories created within each localDir. */
public final int subDirsPerLocalDir;
/** Shuffle manager (SortShuffleManager) that the executor is using. */
/**
* Shuffle manager (SortShuffleManager) that the executor is using.
* If this string contains semicolon, it will also include the meta information
* for push based shuffle in JSON format. Example of the string with semicolon would be:
* SortShuffleManager:{"mergeDir": "mergeDirectory_1", "attemptId": 1}
*/
public final String shuffleManager;

@JsonCreator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@
*/
public class FinalizeShuffleMerge extends BlockTransferMessage {
public final String appId;
public final int appAttemptId;
public final int shuffleId;

public FinalizeShuffleMerge(
String appId,
int appAttemptId,
int shuffleId) {
this.appId = appId;
this.appAttemptId = appAttemptId;
this.shuffleId = shuffleId;
}

Expand All @@ -48,13 +51,14 @@ protected BlockTransferMessage.Type type() {

@Override
public int hashCode() {
return Objects.hashCode(appId, shuffleId);
return Objects.hashCode(appId, appAttemptId, shuffleId);
}

@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
.append("appId", appId)
.append("attemptId", appAttemptId)
.append("shuffleId", shuffleId)
.toString();
}
Expand All @@ -64,25 +68,28 @@ public boolean equals(Object other) {
if (other != null && other instanceof FinalizeShuffleMerge) {
FinalizeShuffleMerge o = (FinalizeShuffleMerge) other;
return Objects.equal(appId, o.appId)
&& appAttemptId == appAttemptId
&& shuffleId == o.shuffleId;
}
return false;
}

@Override
public int encodedLength() {
return Encoders.Strings.encodedLength(appId) + 4;
return Encoders.Strings.encodedLength(appId) + 4 + 4;
}

@Override
public void encode(ByteBuf buf) {
Encoders.Strings.encode(buf, appId);
buf.writeInt(appAttemptId);
buf.writeInt(shuffleId);
}

public static FinalizeShuffleMerge decode(ByteBuf buf) {
String appId = Encoders.Strings.decode(buf);
int attemptId = buf.readInt();
int shuffleId = buf.readInt();
return new FinalizeShuffleMerge(appId, shuffleId);
return new FinalizeShuffleMerge(appId, attemptId, shuffleId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.common.base.Objects;
import io.netty.buffer.ByteBuf;

import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;

Expand All @@ -34,15 +35,23 @@
*/
public class PushBlockStream extends BlockTransferMessage {
public final String appId;
public final int appAttemptId;
public final int shuffleId;
public final int mapIndex;
public final int reduceId;
// Similar to the chunkIndex in StreamChunkId, indicating the index of a block in a batch of
// blocks to be pushed.
public final int index;

public PushBlockStream(String appId, int shuffleId, int mapIndex, int reduceId, int index) {
public PushBlockStream(
String appId,
int appAttemptId,
int shuffleId,
int mapIndex,
int reduceId,
int index) {
this.appId = appId;
this.appAttemptId = appAttemptId;
this.shuffleId = shuffleId;
this.mapIndex = mapIndex;
this.reduceId = reduceId;
Expand All @@ -56,13 +65,14 @@ protected Type type() {

@Override
public int hashCode() {
return Objects.hashCode(appId, shuffleId, mapIndex , reduceId, index);
return Objects.hashCode(appId, appAttemptId, shuffleId, mapIndex , reduceId, index);
}

@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
.append("appId", appId)
.append("attemptId", appAttemptId)
.append("shuffleId", shuffleId)
.append("mapIndex", mapIndex)
.append("reduceId", reduceId)
Expand All @@ -75,6 +85,7 @@ public boolean equals(Object other) {
if (other != null && other instanceof PushBlockStream) {
PushBlockStream o = (PushBlockStream) other;
return Objects.equal(appId, o.appId)
&& appAttemptId == o.appAttemptId
&& shuffleId == o.shuffleId
&& mapIndex == o.mapIndex
&& reduceId == o.reduceId
Expand All @@ -85,12 +96,13 @@ public boolean equals(Object other) {

@Override
public int encodedLength() {
return Encoders.Strings.encodedLength(appId) + 16;
return Encoders.Strings.encodedLength(appId) + 4 + 4 + 4 + 4 + 4;
}

@Override
public void encode(ByteBuf buf) {
Encoders.Strings.encode(buf, appId);
buf.writeInt(appAttemptId);
buf.writeInt(shuffleId);
buf.writeInt(mapIndex);
buf.writeInt(reduceId);
Expand All @@ -99,10 +111,11 @@ public void encode(ByteBuf buf) {

public static PushBlockStream decode(ByteBuf buf) {
String appId = Encoders.Strings.decode(buf);
int attemptId = buf.readInt();
int shuffleId = buf.readInt();
int mapIdx = buf.readInt();
int reduceId = buf.readInt();
int index = buf.readInt();
return new PushBlockStream(appId, shuffleId, mapIdx, reduceId, index);
return new PushBlockStream(appId, attemptId, shuffleId, mapIdx, reduceId, index);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public void testBadMessages() {
public void testFinalizeShuffleMerge() throws IOException {
RpcResponseCallback callback = mock(RpcResponseCallback.class);

FinalizeShuffleMerge req = new FinalizeShuffleMerge("app0", 0);
FinalizeShuffleMerge req = new FinalizeShuffleMerge("app0", 1, 0);
RoaringBitmap bitmap = RoaringBitmap.bitmapOf(0, 1, 2);
MergeStatuses statuses = new MergeStatuses(0, new RoaringBitmap[]{bitmap},
new int[]{3}, new long[]{30});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void testPushOne() {
BlockFetchingListener listener = pushBlocks(
blocks,
blockIds,
Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0)));
Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0, 0)));

verify(listener).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any());
}
Expand All @@ -67,9 +67,9 @@ public void testPushThree() {
BlockFetchingListener listener = pushBlocks(
blocks,
blockIds,
Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0),
new PushBlockStream("app-id", 0, 1, 0, 1),
new PushBlockStream("app-id", 0, 2, 0, 2)));
Arrays.asList(new PushBlockStream("app-id",0, 0, 0, 0, 0),
new PushBlockStream("app-id", 0, 0, 1, 0, 1),
new PushBlockStream("app-id", 0, 0, 2, 0, 2)));

verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any());
verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_1_0"), any());
Expand All @@ -87,9 +87,9 @@ public void testServerFailures() {
BlockFetchingListener listener = pushBlocks(
blocks,
blockIds,
Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0),
new PushBlockStream("app-id", 0, 1, 0, 1),
new PushBlockStream("app-id", 0, 2, 0, 2)));
Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0, 0),
new PushBlockStream("app-id", 0, 0, 1, 0, 1),
new PushBlockStream("app-id", 0, 0, 2, 0, 2)));

verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any());
verify(listener, times(1)).onBlockFetchFailure(eq("shufflePush_0_1_0"), any());
Expand All @@ -107,9 +107,9 @@ public void testHandlingRetriableFailures() {
BlockFetchingListener listener = pushBlocks(
blocks,
blockIds,
Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0),
new PushBlockStream("app-id", 0, 1, 0, 1),
new PushBlockStream("app-id", 0, 2, 0, 2)));
Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0, 0),
new PushBlockStream("app-id", 0, 0, 1, 0, 1),
new PushBlockStream("app-id", 0, 0, 2, 0, 2)));

verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any());
verify(listener, times(0)).onBlockFetchSuccess(not(eq("shufflePush_0_0_0")), any());
Expand All @@ -130,7 +130,7 @@ private static BlockFetchingListener pushBlocks(
TransportClient client = mock(TransportClient.class);
BlockFetchingListener listener = mock(BlockFetchingListener.class);
OneForOneBlockPusher pusher =
new OneForOneBlockPusher(client, "app-id", blockIds, listener, blocks);
new OneForOneBlockPusher(client, "app-id", 0, blockIds, listener, blocks);

Iterator<Map.Entry<String, ManagedBuffer>> blockIterator = blocks.entrySet().iterator();
Iterator<BlockTransferMessage> msgIterator = expectMessages.iterator();
Expand Down
Loading

0 comments on commit b5a5594

Please sign in to comment.