diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index cc7d4dbe00573..9a45f2c895090 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -513,12 +513,18 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { } } else { appShuffleInfo.shuffles.compute(msg.shuffleId, (id, value) -> { - if (null == value || msg.shuffleMergeId != value.shuffleMergeId || + if (null == value || msg.shuffleMergeId < value.shuffleMergeId || INDETERMINATE_SHUFFLE_FINALIZED == value.shuffleMergePartitions) { throw new RuntimeException(String.format( "Shuffle merge finalize request for shuffle %s with" + " shuffleMergeId %s is %s", msg.shuffleId, msg.shuffleMergeId, ErrorHandler.BlockPushErrorHandler.STALE_SHUFFLE_FINALIZE_SUFFIX)); + } else if (msg.shuffleMergeId > value.shuffleMergeId) { + // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId then return + // empty MergeStatuses but cleanup the older shuffleMergeId files. + mergedShuffleCleaner.execute(() -> + closeAndDeletePartitionFiles(value.shuffleMergePartitions)); + return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true); } else { shuffleMergePartitionsRef.set(value.shuffleMergePartitions); return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java index 46a1569008ee7..6bf39c8ab5f0a 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java @@ -1234,6 +1234,25 @@ void closeAndDeletePartitionFiles(Map partitio pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, NO_ATTEMPT_ID, 0, 3)); MergedBlockMeta mergedBlockMeta = pushResolver.getMergedBlockMeta(testApp, 0, 3, 0); validateChunks(testApp, 0, 3, 0, mergedBlockMeta, new int[]{2}, new int[][]{{0}}); + + StreamCallbackWithID stream4 = + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(testApp, NO_ATTEMPT_ID, 0, 4, 0, 0, 0)); + closed.acquire(); + // Do not finalize shuffleMergeId 4 can happen during stage cancellation. + stream4.onData(stream4.getID(), ByteBuffer.wrap(new byte[2])); + stream4.onComplete(stream4.getID()); + + // Check whether the data is cleaned up when higher shuffleMergeId finalize request comes + // but no blocks pushed for that shuffleMergeId + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, NO_ATTEMPT_ID, 0, 5)); + closed.acquire(); + assertFalse("MergedBlock meta file for shuffle 0 and shuffleMergeId 4 should be cleaned" + + " up", appShuffleInfo.getMergedShuffleMetaFile(0, 4, 0).exists()); + assertFalse("MergedBlock index file for shuffle 0 and shuffleMergeId 4 should be cleaned" + + " up", appShuffleInfo.getMergedShuffleIndexFile(0, 4, 0).exists()); + assertFalse("MergedBlock data file for shuffle 0 and shuffleMergeId 4 should be cleaned" + + " up", appShuffleInfo.getMergedShuffleDataFile(0, 4, 0).exists()); } private void useTestFiles(boolean useTestIndexFile, boolean useTestMetaFile) throws IOException {