Skip to content

Commit

Permalink
[ISSUE-329][Bug] Catch NPE in org.apache.uniffle.server.ShuffleTaskMa…
Browse files Browse the repository at this point in the history
…nager#addFinishedBlockIds (#331)

### What changes were proposed in this pull request?
If app expired, thrown a runtime exception with message

### Why are the changes needed?
`reportShuffleResult` after app expired in shuffle server will throw NPE #329  

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added
  • Loading branch information
xianjingfeng authored Nov 16, 2022
1 parent 4004f44 commit 9f0610a
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,9 @@ public void addFinishedBlockIds(
String appId, Integer shuffleId, Map<Integer, long[]> partitionToBlockIds, int bitmapNum) {
refreshAppId(appId);
Map<Integer, Roaring64NavigableMap[]> shuffleIdToPartitions = partitionsToBlockIds.get(appId);
if (shuffleIdToPartitions == null) {
throw new RuntimeException("appId[" + appId + "] is expired!");
}
if (!shuffleIdToPartitions.containsKey(shuffleId)) {
Roaring64NavigableMap[] blockIds = new Roaring64NavigableMap[bitmapNum];
for (int i = 0; i < bitmapNum; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,42 @@ public void testGetFinishedBlockIds() throws Exception {
assertEquals(expectedBlockIds, resBlockIds);
}


@Test
public void testAddFinishedBlockIdsWithoutRegister() throws Exception {
ShuffleServerConf conf = new ShuffleServerConf();
String storageBasePath = HDFS_URI + "rss/test";
String appId = "testAddFinishedBlockIdsToExpiredApp";
final int shuffleId = 1;
final int bitNum = 3;
conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 128L);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(storageBasePath));
conf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);

ShuffleServer shuffleServer = new ShuffleServer(conf);
ShuffleBufferManager shuffleBufferManager = shuffleServer.getShuffleBufferManager();
ShuffleFlushManager shuffleFlushManager = shuffleServer.getShuffleFlushManager();
StorageManager storageManager = shuffleServer.getStorageManager();
ShuffleTaskManager shuffleTaskManager = new ShuffleTaskManager(conf, shuffleFlushManager,
shuffleBufferManager, storageManager);
Map<Integer, long[]> blockIdsToReport = Maps.newHashMap();
try {
shuffleTaskManager.addFinishedBlockIds(appId, shuffleId, blockIdsToReport, bitNum);
fail("Exception should be thrown");
} catch (RuntimeException e) {
assertTrue(e.getMessage().equals("appId[" + appId + "] is expired!"));
}
}

// copy from ClientUtils
private Long getBlockId(long partitionId, long taskAttemptId, long atomicInt) {
return (atomicInt << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
Expand Down

0 comments on commit 9f0610a

Please sign in to comment.