Skip to content

Commit

Permalink
[ISSUE-456] Avoid removing resources for multiple times (#459)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
If `Resource` had been removed, avoid removing twice.

### Why are the changes needed?
When some appIds' removeResource took too much time, the `expiredAppCleanupExecutorService` in ShuffleTaskManager would check and detect the same appId is expired multiple times. Therefore the 
same appId might be added to `expiredAppIdQueue` multiple times. This PR fixes #456 

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT
  • Loading branch information
xianjingfeng authored Jan 10, 2023
1 parent ebaff6a commit 3f166f4
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,12 @@ public void checkLeakShuffleData() {
public void removeResources(String appId) {
LOG.info("Start remove resource for appId[" + appId + "]");
final long start = System.currentTimeMillis();
final Map<Integer, Roaring64NavigableMap> shuffleToCachedBlockIds = shuffleTaskInfos.get(appId).getCachedBlockIds();
ShuffleTaskInfo shffleTaskInfo = shuffleTaskInfos.remove(appId);
if (shffleTaskInfo == null) {
LOG.info("Resource for appId[" + appId + "] had been removed before.");
return;
}
final Map<Integer, Roaring64NavigableMap> shuffleToCachedBlockIds = shffleTaskInfo.getCachedBlockIds();
partitionsToBlockIds.remove(appId);
shuffleBufferManager.removeBuffer(appId);
shuffleFlushManager.removeResources(appId);
Expand All @@ -543,7 +548,6 @@ public void removeResources(String appId) {
new AppPurgeEvent(appId, getUserByAppId(appId), new ArrayList<>(shuffleToCachedBlockIds.keySet()))
);
}
shuffleTaskInfos.remove(appId);
LOG.info("Finish remove resource for appId[" + appId + "] cost " + (System.currentTimeMillis() - start) + " ms");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.collect.Lists;
Expand All @@ -34,6 +36,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
Expand Down Expand Up @@ -512,6 +515,57 @@ public void clearTest() throws Exception {
assertTrue(shuffleTaskManager.getCachedBlockIds("clearTest1", shuffleId).isEmpty());
}


@Test
public void clearMultiTimesTest() throws Exception {
ShuffleServerConf conf = new ShuffleServerConf();
String storageBasePath = HDFS_URI + "rss/clearTest";
final int shuffleId = 1;
conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 128L);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(storageBasePath));
conf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);

ShuffleServer shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
String appId = "clearMultiTimesTest";
shuffleTaskManager.registerShuffle(
appId,
shuffleId,
Lists.newArrayList(new PartitionRange(0, 1)),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
StringUtils.EMPTY
);
shuffleTaskManager.refreshAppId(appId);
assertEquals(1, shuffleTaskManager.getAppIds().size());

shuffleTaskManager.checkResourceStatus();
assertEquals(Sets.newHashSet(appId), shuffleTaskManager.getAppIds());

CountDownLatch countDownLatch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
shuffleTaskManager.removeResources(appId);
} finally {
countDownLatch.countDown();
}
}).start();
}
countDownLatch.await();
assertEquals(Sets.newHashSet(), shuffleTaskManager.getAppIds());
assertTrue(shuffleTaskManager.getCachedBlockIds(appId, shuffleId).isEmpty());
}

@Test
public void getBlockIdsByPartitionIdTest() {
ShuffleServerConf conf = new ShuffleServerConf();
Expand Down Expand Up @@ -753,7 +807,8 @@ public void checkAndClearLeakShuffleDataTest(@TempDir File tempDir) throws Excep
assertTrue(appIdsOnDisk.contains(appId));

// make sure heartbeat timeout and resources are removed
Thread.sleep(5000);
Awaitility.await().timeout(10, TimeUnit.SECONDS).until(
() -> shuffleTaskManager.getAppIds().size() == 0);

// Create the hidden dir to simulate LocalStorageChecker's check
String storageDir = tempDir.getAbsolutePath();
Expand Down

0 comments on commit 3f166f4

Please sign in to comment.