Skip to content

Commit

Permalink
Add ut
Browse files Browse the repository at this point in the history
  • Loading branch information
xianjingfeng committed Feb 2, 2023
1 parent e75bce9 commit 67d1559
Showing 1 changed file with 50 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,56 @@ public void clearMultiTimesTest() throws Exception {
assertTrue(shuffleTaskManager.getCachedBlockIds(appId, shuffleId).isEmpty());
}

@Test
public void removeResourcesByShuffleIdsMultiTimesTest() throws Exception {
ShuffleServerConf conf = new ShuffleServerConf();
String storageBasePath = HDFS_URI + "rss/removeResourcesByShuffleIdsMultiTimesTest";
final int shuffleId = 1;
conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 128L);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(storageBasePath));
conf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);

ShuffleServer shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
String appId = "removeResourcesByShuffleIdsMultiTimesTest";
shuffleTaskManager.registerShuffle(
appId,
shuffleId,
Lists.newArrayList(new PartitionRange(0, 1)),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
StringUtils.EMPTY
);
shuffleTaskManager.refreshAppId(appId);
assertEquals(1, shuffleTaskManager.getAppIds().size());

shuffleTaskManager.checkResourceStatus();
assertEquals(Sets.newHashSet(appId), shuffleTaskManager.getAppIds());
assertEquals(1, (int) ShuffleServerMetrics.gaugeTotalPartitionNum.get());
CountDownLatch countDownLatch = new CountDownLatch(3);
List<Integer> shuffleIds = Lists.newArrayList(shuffleId);
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
shuffleTaskManager.removeResourcesByShuffleIds(appId, shuffleIds);
} finally {
countDownLatch.countDown();
}
}).start();
}
countDownLatch.await();
assertEquals(0, (int) ShuffleServerMetrics.gaugeTotalPartitionNum.get());
}

@Test
public void getBlockIdsByPartitionIdTest() {
ShuffleServerConf conf = new ShuffleServerConf();
Expand Down

0 comments on commit 67d1559

Please sign in to comment.