diff --git a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java index 459a258cba..0bcb6b924b 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java @@ -316,12 +316,12 @@ private void cleanupStorageSelectionCache(PurgeEvent event) { Function deleteConditionFunc = null; String prefixKey = null; if (event instanceof AppPurgeEvent) { - prefixKey = UnionKey.buildKey(event.getAppId()); + prefixKey = UnionKey.buildKey(event.getAppId(), ""); deleteConditionFunc = partitionUnionKey -> UnionKey.startsWith(partitionUnionKey, event.getAppId()); } else if (event instanceof ShufflePurgeEvent) { int shuffleId = event.getShuffleIds().get(0); - prefixKey = UnionKey.buildKey(event.getAppId(), shuffleId); + prefixKey = UnionKey.buildKey(event.getAppId(), shuffleId, ""); deleteConditionFunc = partitionUnionKey -> UnionKey.startsWith(partitionUnionKey, event.getAppId(), shuffleId); } diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java index 0747b35244..2ae23d0a18 100644 --- a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java @@ -73,6 +73,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -503,14 +504,18 @@ public void clearLocalTest(@TempDir File tempDir) throws Exception { manager.addToFlushQueue(event2); ShuffleDataFlushEvent event3 = createShuffleDataFlushEvent(appId2, 2, 0, 1, null); manager.addToFlushQueue(event3); + ShuffleDataFlushEvent event5 = createShuffleDataFlushEvent(appId2, 11, 0, 1, null); + manager.addToFlushQueue(event5); assertEquals(storageManager.selectStorage(event1), storageManager.selectStorage(event2)); final AbstractStorage storage = (AbstractStorage) storageManager.selectStorage(event1); waitForFlush(manager, appId1, 1, 5); waitForFlush(manager, appId2, 1, 5); waitForFlush(manager, appId2, 2, 5); + waitForFlush(manager, appId2, 11, 5); assertEquals(5, manager.getCommittedBlockIds(appId1, 1).getLongCardinality()); assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality()); assertEquals(5, manager.getCommittedBlockIds(appId2, 2).getLongCardinality()); + assertEquals(5, manager.getCommittedBlockIds(appId2, 11).getLongCardinality()); assertEquals(2, storage.getHandlerSize()); File file = new File(tempDir, appId1); assertTrue(file.exists()); @@ -523,6 +528,10 @@ public void clearLocalTest(@TempDir File tempDir) throws Exception { new AppPurgeEvent(appId1, StringUtils.EMPTY, Lists.newArrayList(1))); manager.removeResources(appId1); assertFalse(file.exists()); + + ShuffleDataReadEvent shuffleReadEvent = new ShuffleDataReadEvent(appId2, 1, 0, 0); + assertNotNull(storageManager.selectStorage(shuffleReadEvent)); + assertEquals(0, manager.getCommittedBlockIds(appId1, 1).getLongCardinality()); assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality()); assertEquals(5, manager.getCommittedBlockIds(appId2, 2).getLongCardinality()); @@ -530,6 +539,12 @@ public void clearLocalTest(@TempDir File tempDir) throws Exception { manager.removeResources(appId2); storageManager.removeResources( new ShufflePurgeEvent(appId2, StringUtils.EMPTY, Lists.newArrayList(1))); + + ShuffleDataReadEvent shuffle1ReadEvent = new ShuffleDataReadEvent(appId2, 1, 0, 0); + ShuffleDataReadEvent shuffle11ReadEvent = new ShuffleDataReadEvent(appId2, 11, 0, 0); + assertNull(storageManager.selectStorage(shuffle1ReadEvent)); + assertNotNull(storageManager.selectStorage(shuffle11ReadEvent)); + storageManager.removeResources( new ShufflePurgeEvent(appId2, StringUtils.EMPTY, Lists.newArrayList(2))); storageManager.removeResources(