diff --git a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java index b1231f0ce9..da6e5ae5aa 100644 --- a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java @@ -18,6 +18,7 @@ package org.apache.uniffle.server.storage; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -25,15 +26,22 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.apache.uniffle.common.ShufflePartitionedBlock; +import org.apache.uniffle.server.ShuffleDataFlushEvent; +import org.apache.uniffle.server.ShuffleDataReadEvent; import org.apache.uniffle.server.ShuffleServerConf; import org.apache.uniffle.server.ShuffleServerMetrics; import org.apache.uniffle.storage.common.LocalStorage; +import org.apache.uniffle.storage.common.Storage; import org.apache.uniffle.storage.util.StorageType; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; /** * The class is to test the {@link LocalStorageManager} @@ -50,6 +58,65 @@ public static void clear() { ShuffleServerMetrics.clear(); } + private ShuffleDataFlushEvent toDataFlushEvent(String appId, int shuffleId, int startPartition) { + return new ShuffleDataFlushEvent( + 1, // event id + appId, // appId + shuffleId, // shuffle id + startPartition, // startPartition + 1, // endPartition + 1, // size + new ArrayList(), // shuffleBlocks + null, // valid + null // shuffleBuffer + ); + } + + @Test + public void testStorageSelection() { + String[] storagePaths = {"/tmp/rss-data1", "/tmp/rss-data2"}; + + ShuffleServerConf conf = new ShuffleServerConf(); + conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(storagePaths)); + conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L); + conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name()); + LocalStorageManager localStorageManager = new LocalStorageManager(conf); + + List storages = localStorageManager.getStorages(); + assertNotNull(storages); + + String appId = "testCorruptedStorageApp"; + + // case1: no corrupted storage, flush and read event of the same appId and shuffleId and startPartition + // will always get the same storage + ShuffleDataFlushEvent dataFlushEvent1 = toDataFlushEvent(appId, 1, 1); + Storage storage1 = localStorageManager.selectStorage(dataFlushEvent1); + + ShuffleDataFlushEvent dataFlushEvent2 = toDataFlushEvent(appId, 1, 1); + Storage storage2 = localStorageManager.selectStorage(dataFlushEvent2); + + ShuffleDataReadEvent dataReadEvent = new ShuffleDataReadEvent(appId, 1, 1); + Storage storage3 = localStorageManager.selectStorage(dataReadEvent); + assertEquals(storage1, storage2); + assertEquals(storage1, storage3); + + // case2: one storage is corrupted, and it will switch to other storage at the first time of writing + // event of (appId, shuffleId, startPartition) + ((LocalStorage)storage1).markCorrupted(); + Storage storage4 = localStorageManager.selectStorage(dataFlushEvent1); + assertNotEquals(storage4.getStoragePath(), storage1.getStoragePath()); + assertEquals(localStorageManager.selectStorage(dataReadEvent), storage4); + + // case3: one storage is corrupted when it happened after the original event has been written, + // so it will switch to another storage for write and read event. + LocalStorage mockedStorage = spy((LocalStorage)storage1); + when(mockedStorage.containsWriteHandler(appId, 1, 1)).thenReturn(true); + Storage storage5 = localStorageManager.selectStorage(dataFlushEvent1); + Storage storage6 = localStorageManager.selectStorage(dataReadEvent); + assertNotEquals(storage1, storage5); + assertEquals(storage5, storage6); + } + @Test public void testInitLocalStorageManager() { String[] storagePaths = {"/tmp/rssdata", "/tmp/rssdata2"};