Skip to content

Commit

Permalink
[apache#1501] fix(server): storage selection cache accidentally delet…
Browse files Browse the repository at this point in the history
…ed when clearing stage level data. (apache#1505)

avoid storage selection cache accidentally deleted

Fix: (apache#1501)

No.

Unit tests
  • Loading branch information
dingshun3016 authored and zuston committed May 22, 2024
1 parent 03071a1 commit 4dcb067
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -316,12 +316,12 @@ private void cleanupStorageSelectionCache(PurgeEvent event) {
Function<String, Boolean> deleteConditionFunc = null;
String prefixKey = null;
if (event instanceof AppPurgeEvent) {
prefixKey = UnionKey.buildKey(event.getAppId());
prefixKey = UnionKey.buildKey(event.getAppId(), "");
deleteConditionFunc =
partitionUnionKey -> UnionKey.startsWith(partitionUnionKey, event.getAppId());
} else if (event instanceof ShufflePurgeEvent) {
int shuffleId = event.getShuffleIds().get(0);
prefixKey = UnionKey.buildKey(event.getAppId(), shuffleId);
prefixKey = UnionKey.buildKey(event.getAppId(), shuffleId, "");
deleteConditionFunc =
partitionUnionKey -> UnionKey.startsWith(partitionUnionKey, event.getAppId(), shuffleId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
Expand Down Expand Up @@ -503,14 +504,18 @@ public void clearLocalTest(@TempDir File tempDir) throws Exception {
manager.addToFlushQueue(event2);
ShuffleDataFlushEvent event3 = createShuffleDataFlushEvent(appId2, 2, 0, 1, null);
manager.addToFlushQueue(event3);
ShuffleDataFlushEvent event5 = createShuffleDataFlushEvent(appId2, 11, 0, 1, null);
manager.addToFlushQueue(event5);
assertEquals(storageManager.selectStorage(event1), storageManager.selectStorage(event2));
final AbstractStorage storage = (AbstractStorage) storageManager.selectStorage(event1);
waitForFlush(manager, appId1, 1, 5);
waitForFlush(manager, appId2, 1, 5);
waitForFlush(manager, appId2, 2, 5);
waitForFlush(manager, appId2, 11, 5);
assertEquals(5, manager.getCommittedBlockIds(appId1, 1).getLongCardinality());
assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
assertEquals(5, manager.getCommittedBlockIds(appId2, 2).getLongCardinality());
assertEquals(5, manager.getCommittedBlockIds(appId2, 11).getLongCardinality());
assertEquals(2, storage.getHandlerSize());
File file = new File(tempDir, appId1);
assertTrue(file.exists());
Expand All @@ -523,13 +528,23 @@ public void clearLocalTest(@TempDir File tempDir) throws Exception {
new AppPurgeEvent(appId1, StringUtils.EMPTY, Lists.newArrayList(1)));
manager.removeResources(appId1);
assertFalse(file.exists());

ShuffleDataReadEvent shuffleReadEvent = new ShuffleDataReadEvent(appId2, 1, 0, 0);
assertNotNull(storageManager.selectStorage(shuffleReadEvent));

assertEquals(0, manager.getCommittedBlockIds(appId1, 1).getLongCardinality());
assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
assertEquals(5, manager.getCommittedBlockIds(appId2, 2).getLongCardinality());
assertEquals(1, storage.getHandlerSize());
manager.removeResources(appId2);
storageManager.removeResources(
new ShufflePurgeEvent(appId2, StringUtils.EMPTY, Lists.newArrayList(1)));

ShuffleDataReadEvent shuffle1ReadEvent = new ShuffleDataReadEvent(appId2, 1, 0, 0);
ShuffleDataReadEvent shuffle11ReadEvent = new ShuffleDataReadEvent(appId2, 11, 0, 0);
assertNull(storageManager.selectStorage(shuffle1ReadEvent));
assertNotNull(storageManager.selectStorage(shuffle11ReadEvent));

storageManager.removeResources(
new ShufflePurgeEvent(appId2, StringUtils.EMPTY, Lists.newArrayList(2)));
storageManager.removeResources(
Expand Down

0 comments on commit 4dcb067

Please sign in to comment.