diff --git a/common/src/main/java/org/apache/uniffle/common/exception/FileNotFoundException.java b/common/src/main/java/org/apache/uniffle/common/exception/FileNotFoundException.java new file mode 100644 index 0000000000..b7d1a34b5f --- /dev/null +++ b/common/src/main/java/org/apache/uniffle/common/exception/FileNotFoundException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.exception; + +public class FileNotFoundException extends RuntimeException { + + public FileNotFoundException(String message) { + super(message); + } + + public FileNotFoundException(String message, Throwable e) { + super(message, e); + } +} diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java new file mode 100644 index 0000000000..03420fbab5 --- /dev/null +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.test; + +import java.io.File; + +import com.google.common.collect.Lists; +import com.google.common.io.Files; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient; +import org.apache.uniffle.common.ShuffleDataResult; +import org.apache.uniffle.common.exception.RssException; +import org.apache.uniffle.coordinator.CoordinatorConf; +import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.storage.handler.api.ClientReadHandler; +import org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler; +import org.apache.uniffle.storage.handler.impl.MemoryQuorumClientReadHandler; +import org.apache.uniffle.storage.util.StorageType; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class ShuffleServerWithLocalOfExceptionTest extends ShuffleReadWriteBase { + + private ShuffleServerGrpcClient shuffleServerClient; + private static String REMOTE_STORAGE = HDFS_URI + "rss/test"; + + @BeforeAll + public static void setupServers() throws Exception { + CoordinatorConf coordinatorConf = getCoordinatorConf(); + createCoordinatorServer(coordinatorConf); + + ShuffleServerConf shuffleServerConf = getShuffleServerConf(); + File tmpDir = Files.createTempDir(); + File dataDir1 = new File(tmpDir, "data1"); + File dataDir2 = new File(tmpDir, "data2"); + String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath(); + shuffleServerConf.setString("rss.storage.type", StorageType.LOCALFILE.name()); + shuffleServerConf.setString("rss.storage.basePath", basePath); + shuffleServerConf.setString("rss.server.app.expired.withoutHeartbeat", "5000"); + createShuffleServer(shuffleServerConf); + + startServers(); + } + + @BeforeEach + public void createClient() { + shuffleServerClient = new ShuffleServerGrpcClient(LOCALHOST, SHUFFLE_SERVER_PORT); + } + + @AfterEach + public void closeClient() { + shuffleServerClient.close(); + } + + @Test + public void testReadWhenConnectionFailedShouldThrowException() throws Exception { + String testAppId = "testReadWhenException"; + int shuffleId = 0; + int partitionId = 0; + + MemoryQuorumClientReadHandler memoryQuorumClientReadHandler = new MemoryQuorumClientReadHandler( + testAppId, shuffleId, partitionId, 150, Lists.newArrayList(shuffleServerClient)); + ClientReadHandler[] handlers = new ClientReadHandler[1]; + handlers[0] = memoryQuorumClientReadHandler; + ComposedClientReadHandler composedClientReadHandler = new ComposedClientReadHandler(handlers); + shuffleServers.get(0).stopServer(); + try { + ShuffleDataResult sdr = composedClientReadHandler.readShuffleData(); + fail("Should throw connection exception directly."); + } catch (RssException rssException) { + assertTrue(rssException.getMessage().contains("Failed to read shuffle data from HOT handler")); + } + } +} diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java index 2953290825..86e78cff3c 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java @@ -52,8 +52,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; public class ShuffleWithRssClientTest extends ShuffleReadWriteBase { @@ -232,13 +232,7 @@ public void writeReadTest() throws Exception { ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.LOCALFILE.name(), testAppId, 0, 0, 100, 1, 10, 1000, "", blockIdBitmap, taskIdBitmap, Lists.newArrayList(shuffleServerInfo1, shuffleServerInfo2), null, new DefaultIdHelper()); - - try { - readClient.readShuffleBlockData(); - fail(EXPECTED_EXCEPTION_MESSAGE); - } catch (Exception e) { - assertTrue(e.getMessage().contains("Failed to read all replicas for")); - } + assertNull(readClient.readShuffleBlockData()); readClient.close(); // send 2nd commit, data will be persisted to disk diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java index 96ed80e317..f09be54a9b 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java @@ -142,25 +142,20 @@ public void readTest3() throws Exception { Map expectedData = Maps.newHashMap(); Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf(); - Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0); List blocks = createShuffleBlockList( 0, 0, 0, 2, 30, blockIdBitmap, expectedData, mockSSI); sendTestData(testAppId, blocks); - ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.LOCALFILE.name(), - testAppId, 0, 0, 100, 1, 10, 1000, - "", blockIdBitmap, taskIdBitmap, shuffleServerInfo, null, new DefaultIdHelper()); FileUtils.deleteDirectory(new File(DATA_DIR1.getAbsolutePath() + "/" + testAppId + "/0/0-0")); FileUtils.deleteDirectory(new File(DATA_DIR2.getAbsolutePath() + "/" + testAppId + "/0/0-0")); // sleep to wait delete operation Thread.sleep(2000); - try { - readClient.readShuffleBlockData(); - fail(EXPECTED_EXCEPTION_MESSAGE); - } catch (Exception e) { - assertTrue(e.getMessage().contains("Failed to read all replicas for")); - } + Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0); + ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.LOCALFILE.name(), + testAppId, 0, 0, 100, 1, 10, 1000, + "", blockIdBitmap, taskIdBitmap, shuffleServerInfo, null, new DefaultIdHelper()); + assertNull(readClient.readShuffleBlockData()); readClient.close(); } diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java index 0501decab4..e08fd1c893 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java @@ -40,6 +40,7 @@ import org.apache.uniffle.common.ShufflePartitionedBlock; import org.apache.uniffle.common.ShufflePartitionedData; import org.apache.uniffle.common.config.RssBaseConf; +import org.apache.uniffle.common.exception.FileNotFoundException; import org.apache.uniffle.proto.RssProtos; import org.apache.uniffle.proto.RssProtos.AppHeartBeatRequest; import org.apache.uniffle.proto.RssProtos.AppHeartBeatResponse; @@ -533,6 +534,12 @@ public void getLocalShuffleIndex(GetLocalShuffleIndexRequest request, builder.setIndexData(UnsafeByteOperations.unsafeWrap(data)); reply = builder.build(); + } catch (FileNotFoundException indexFileNotFoundException) { + LOG.warn("Index file for {} is not found, maybe the data has been flushed to cold storage.", + requestInfo, indexFileNotFoundException); + reply = GetLocalShuffleIndexResponse.newBuilder() + .setStatus(valueOf(status)) + .build(); } catch (Exception e) { status = StatusCode.INTERNAL_ERROR; msg = "Error happened when get shuffle index for " + requestInfo + ", " + e.getMessage(); diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java index f990c383f2..aab15b5d0c 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java @@ -25,9 +25,9 @@ import org.apache.uniffle.common.BufferSegment; import org.apache.uniffle.common.ShuffleDataResult; +import org.apache.uniffle.common.exception.RssException; import org.apache.uniffle.storage.handler.api.ClientReadHandler; - public class ComposedClientReadHandler implements ClientReadHandler { private static final Logger LOG = LoggerFactory.getLogger(ComposedClientReadHandler.class); @@ -127,7 +127,7 @@ public ShuffleDataResult readShuffleData() { return null; } } catch (Exception e) { - LOG.error("Failed to read shuffle data from " + getCurrentHandlerName() + " handler", e); + throw new RssException("Failed to read shuffle data from " + getCurrentHandlerName() + " handler", e); } // when is no data for current handler, and the upmostLevel is not reached, // then try next one if there has diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java index 1eeb75bc3e..d1d0bc7df3 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java @@ -25,6 +25,7 @@ import org.apache.uniffle.common.ShuffleDataResult; import org.apache.uniffle.common.ShuffleIndexResult; +import org.apache.uniffle.common.exception.FileNotFoundException; import org.apache.uniffle.common.util.Constants; import org.apache.uniffle.storage.common.FileBasedShuffleSegment; import org.apache.uniffle.storage.handler.api.ServerReadHandler; @@ -80,7 +81,7 @@ private void prepareFilePath( File baseFolder = new File(fullShufflePath); if (!baseFolder.exists()) { // the partition doesn't exist in this base folder, skip - throw new RuntimeException("Can't find folder " + fullShufflePath); + throw new FileNotFoundException("Can't find folder " + fullShufflePath); } File[] indexFiles; String failedGetIndexFileMsg = "No index file found in " + storageBasePath; @@ -93,7 +94,7 @@ public boolean accept(File dir, String name) { } }); } catch (Exception e) { - throw new RuntimeException(failedGetIndexFileMsg, e); + throw new FileNotFoundException(failedGetIndexFileMsg, e); } if (indexFiles != null && indexFiles.length > 0) {