Skip to content

Commit

Permalink
[SPARK-32923][FOLLOW-UP] Clean up older shuffleMergeId shuffle files …
Browse files Browse the repository at this point in the history
…when finalize request for higher shuffleMergeId is received

### What changes were proposed in this pull request?

Clean up older shuffleMergeId shuffle files when finalize request for higher shuffleMergeId is received when no blocks pushed for the corresponding shuffleMergeId. This is identified as part of #33034 (comment).

### Why are the changes needed?

Without this change, older shuffleMergeId files won't be cleaned up properly.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added changes to existing unit test to address this case.

Closes #33605 from venkata91/SPARK-32923-follow-on.

Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
  • Loading branch information
venkata91 authored and wakun committed Jul 30, 2022
1 parent 8ba68f2 commit 8ab806a
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -513,12 +513,18 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) {
}
} else {
appShuffleInfo.shuffles.compute(msg.shuffleId, (id, value) -> {
if (null == value || msg.shuffleMergeId != value.shuffleMergeId ||
if (null == value || msg.shuffleMergeId < value.shuffleMergeId ||
INDETERMINATE_SHUFFLE_FINALIZED == value.shuffleMergePartitions) {
throw new RuntimeException(String.format(
"Shuffle merge finalize request for shuffle %s with" + " shuffleMergeId %s is %s",
msg.shuffleId, msg.shuffleMergeId,
ErrorHandler.BlockPushErrorHandler.STALE_SHUFFLE_FINALIZE_SUFFIX));
} else if (msg.shuffleMergeId > value.shuffleMergeId) {
// If no blocks pushed for the finalizeShuffleMerge shuffleMergeId then return
// empty MergeStatuses but cleanup the older shuffleMergeId files.
mergedShuffleCleaner.execute(() ->
closeAndDeletePartitionFiles(value.shuffleMergePartitions));
return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
} else {
shuffleMergePartitionsRef.set(value.shuffleMergePartitions);
return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1234,6 +1234,25 @@ void closeAndDeletePartitionFiles(Map<Integer, AppShufflePartitionInfo> partitio
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, NO_ATTEMPT_ID, 0, 3));
MergedBlockMeta mergedBlockMeta = pushResolver.getMergedBlockMeta(testApp, 0, 3, 0);
validateChunks(testApp, 0, 3, 0, mergedBlockMeta, new int[]{2}, new int[][]{{0}});

StreamCallbackWithID stream4 =
pushResolver.receiveBlockDataAsStream(
new PushBlockStream(testApp, NO_ATTEMPT_ID, 0, 4, 0, 0, 0));
closed.acquire();
// Do not finalize shuffleMergeId 4 can happen during stage cancellation.
stream4.onData(stream4.getID(), ByteBuffer.wrap(new byte[2]));
stream4.onComplete(stream4.getID());

// Check whether the data is cleaned up when higher shuffleMergeId finalize request comes
// but no blocks pushed for that shuffleMergeId
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, NO_ATTEMPT_ID, 0, 5));
closed.acquire();
assertFalse("MergedBlock meta file for shuffle 0 and shuffleMergeId 4 should be cleaned"
+ " up", appShuffleInfo.getMergedShuffleMetaFile(0, 4, 0).exists());
assertFalse("MergedBlock index file for shuffle 0 and shuffleMergeId 4 should be cleaned"
+ " up", appShuffleInfo.getMergedShuffleIndexFile(0, 4, 0).exists());
assertFalse("MergedBlock data file for shuffle 0 and shuffleMergeId 4 should be cleaned"
+ " up", appShuffleInfo.getMergedShuffleDataFile(0, 4, 0).exists());
}

private void useTestFiles(boolean useTestIndexFile, boolean useTestMetaFile) throws IOException {
Expand Down

0 comments on commit 8ab806a

Please sign in to comment.