Skip to content

Commit

Permalink
Address comments, add unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouyejoe committed Jul 16, 2021
1 parent 1a2eb32 commit fab51d4
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,7 @@ public ManagedBuffer getMergedBlockData(String appId, int shuffleId, int reduceI
@Override
public String[] getMergedBlockDirs(String appId) {
AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
String[] activeLocalDirs =
Preconditions.checkNotNull(appShuffleInfo.appPathsInfo.activeLocalDirs,
"application " + appId + " active local dirs list has not been updated " +
"by any executor registration");
return activeLocalDirs;
return appShuffleInfo.appPathsInfo.activeLocalDirs;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public boolean equals(Object other) {

@Override
public int encodedLength() {
return Encoders.Strings.encodedLength(appId) + 8;
return Encoders.Strings.encodedLength(appId) + 4 + 4;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;

import org.apache.spark.network.protocol.Encoders;

// Needed by ScalaDoc. See SPARK-7726
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ public void testFailureAfterDuplicateBlockDoesNotInterfereActiveStream() throws
}

@Test(expected = IllegalArgumentException.class)
public void testBlockReceivedAfterNewAttemptRegistered()
public void testPushBlockFromPreviousAttemptIsRejected()
throws IOException, InterruptedException {
Semaphore closed = new Semaphore(0);
pushResolver = new RemoteBlockPushResolver(conf) {
Expand All @@ -928,7 +928,7 @@ void closeAndDeletePartitionFilesIfNeeded(
closed.release();
}
};
String testApp = "updateLocalDirsTwiceWithTwoAttempts";
String testApp = "testPushBlockFromPreviousAttemptIsRejected";
Path[] attempt1LocalDirs = createLocalDirs(1);
registerExecutor(testApp,
prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_1),
Expand Down Expand Up @@ -982,13 +982,46 @@ void closeAndDeletePartitionFilesIfNeeded(
assertEquals(
"The attempt id 1 in this PushBlockStream message does not match " +
"with the current attempt id 2 stored in shuffle service for application " +
"updateLocalDirsTwiceWithTwoAttempts", re.getMessage());
testApp, re.getMessage());
throw re;
}
}

@Test(expected = IllegalArgumentException.class)
public void testFinalizeShuffleMergeFromPreviousAttemptIsAborted()
throws IOException, InterruptedException {
String testApp = "testFinalizeShuffleMergeFromPreviousAttemptIsAborted";
Path[] attempt1LocalDirs = createLocalDirs(1);
registerExecutor(testApp,
prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_1),
MERGE_DIRECTORY_META_1);
ByteBuffer[] blocks = new ByteBuffer[]{
ByteBuffer.wrap(new byte[4]),
ByteBuffer.wrap(new byte[5])
};
StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(
new PushBlockStream(testApp, 1, 0, 0, 0, 0));
for (ByteBuffer block : blocks) {
stream1.onData(stream1.getID(), block);
}
stream1.onComplete(stream1.getID());
Path[] attempt2LocalDirs = createLocalDirs(2);
registerExecutor(testApp,
prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_2),
MERGE_DIRECTORY_META_2);
try {
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, ATTEMPT_ID_1, 0));
} catch (IllegalArgumentException e) {
assertEquals(e.getMessage(),
String.format("The attempt id %s in this FinalizeShuffleMerge message does not " +
"match with the current attempt id %s stored in shuffle service for application %s",
ATTEMPT_ID_1, ATTEMPT_ID_2, testApp));
throw e;
}
}

@Test(expected = ClosedChannelException.class)
public void testPushBlockStreamCallBackWhileNewAttemptRegistered()
public void testOngoingMergeOfBlockFromPreviousAttemptIsAborted()
throws IOException, InterruptedException {
Semaphore closed = new Semaphore(0);
pushResolver = new RemoteBlockPushResolver(conf) {
Expand All @@ -1000,7 +1033,7 @@ void closeAndDeletePartitionFilesIfNeeded(
closed.release();
}
};
String testApp = "testPushBlockStreamCallBackWhileNewAttemptRegisters";
String testApp = "testOngoingMergeOfBlockFromPreviousAttemptIsAborted";
Path[] attempt1LocalDirs = createLocalDirs(1);
registerExecutor(testApp,
prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_1),
Expand Down

0 comments on commit fab51d4

Please sign in to comment.