From fab51d40c1288db6b6ddf5382a70328b1772de24 Mon Sep 17 00:00:00 2001 From: Ye Zhou Date: Fri, 16 Jul 2021 14:52:47 -0700 Subject: [PATCH] Address comments, add unit test --- .../shuffle/RemoteBlockPushResolver.java | 6 +-- .../protocol/FinalizeShuffleMerge.java | 2 +- .../shuffle/protocol/PushBlockStream.java | 1 + .../shuffle/RemoteBlockPushResolverSuite.java | 43 ++++++++++++++++--- 4 files changed, 41 insertions(+), 11 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 790a4339497ba..29e2d2adffccf 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -247,11 +247,7 @@ public ManagedBuffer getMergedBlockData(String appId, int shuffleId, int reduceI @Override public String[] getMergedBlockDirs(String appId) { AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId); - String[] activeLocalDirs = - Preconditions.checkNotNull(appShuffleInfo.appPathsInfo.activeLocalDirs, - "application " + appId + " active local dirs list has not been updated " + - "by any executor registration"); - return activeLocalDirs; + return appShuffleInfo.appPathsInfo.activeLocalDirs; } @Override diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FinalizeShuffleMerge.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FinalizeShuffleMerge.java index 92c95ca9045ad..f6ab78b1ab7d4 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FinalizeShuffleMerge.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FinalizeShuffleMerge.java @@ -76,7 +76,7 @@ public boolean equals(Object other) { @Override public int encodedLength() { - return Encoders.Strings.encodedLength(appId) + 8; + return Encoders.Strings.encodedLength(appId) + 4 + 4; } @Override diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java index 7726f5db319aa..b35ac33a6020b 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java @@ -22,6 +22,7 @@ import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; + import org.apache.spark.network.protocol.Encoders; // Needed by ScalaDoc. See SPARK-7726 diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java index c033460237353..32836c93c1f6a 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java @@ -916,7 +916,7 @@ public void testFailureAfterDuplicateBlockDoesNotInterfereActiveStream() throws } @Test(expected = IllegalArgumentException.class) - public void testBlockReceivedAfterNewAttemptRegistered() + public void testPushBlockFromPreviousAttemptIsRejected() throws IOException, InterruptedException { Semaphore closed = new Semaphore(0); pushResolver = new RemoteBlockPushResolver(conf) { @@ -928,7 +928,7 @@ void closeAndDeletePartitionFilesIfNeeded( closed.release(); } }; - String testApp = "updateLocalDirsTwiceWithTwoAttempts"; + String testApp = "testPushBlockFromPreviousAttemptIsRejected"; Path[] attempt1LocalDirs = createLocalDirs(1); registerExecutor(testApp, prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_1), @@ -982,13 +982,46 @@ void closeAndDeletePartitionFilesIfNeeded( assertEquals( "The attempt id 1 in this PushBlockStream message does not match " + "with the current attempt id 2 stored in shuffle service for application " + - "updateLocalDirsTwiceWithTwoAttempts", re.getMessage()); + testApp, re.getMessage()); throw re; } } + @Test(expected = IllegalArgumentException.class) + public void testFinalizeShuffleMergeFromPreviousAttemptIsAborted() + throws IOException, InterruptedException { + String testApp = "testFinalizeShuffleMergeFromPreviousAttemptIsAborted"; + Path[] attempt1LocalDirs = createLocalDirs(1); + registerExecutor(testApp, + prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_1), + MERGE_DIRECTORY_META_1); + ByteBuffer[] blocks = new ByteBuffer[]{ + ByteBuffer.wrap(new byte[4]), + ByteBuffer.wrap(new byte[5]) + }; + StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(testApp, 1, 0, 0, 0, 0)); + for (ByteBuffer block : blocks) { + stream1.onData(stream1.getID(), block); + } + stream1.onComplete(stream1.getID()); + Path[] attempt2LocalDirs = createLocalDirs(2); + registerExecutor(testApp, + prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_2), + MERGE_DIRECTORY_META_2); + try { + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, ATTEMPT_ID_1, 0)); + } catch (IllegalArgumentException e) { + assertEquals(e.getMessage(), + String.format("The attempt id %s in this FinalizeShuffleMerge message does not " + + "match with the current attempt id %s stored in shuffle service for application %s", + ATTEMPT_ID_1, ATTEMPT_ID_2, testApp)); + throw e; + } + } + @Test(expected = ClosedChannelException.class) - public void testPushBlockStreamCallBackWhileNewAttemptRegistered() + public void testOngoingMergeOfBlockFromPreviousAttemptIsAborted() throws IOException, InterruptedException { Semaphore closed = new Semaphore(0); pushResolver = new RemoteBlockPushResolver(conf) { @@ -1000,7 +1033,7 @@ void closeAndDeletePartitionFilesIfNeeded( closed.release(); } }; - String testApp = "testPushBlockStreamCallBackWhileNewAttemptRegisters"; + String testApp = "testOngoingMergeOfBlockFromPreviousAttemptIsAborted"; Path[] attempt1LocalDirs = createLocalDirs(1); registerExecutor(testApp, prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_1),