Skip to content

Commit

Permalink
Fast fail when reading failed in ComposedClientReadHandler (#213)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Fast fail when reading failed in ComposedClientReadHandler

### Why are the changes needed?
It should fast fail on network failure when reading shuffle data from memory/localfile/hdfs in `ComposedClientReadHandler`. If not, it will throw inconsistent blockIds and make users confused.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UTs.
  • Loading branch information
zuston authored Sep 27, 2022
1 parent f1cb43f commit 54ddca6
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.common.exception;

public class FileNotFoundException extends RuntimeException {

public FileNotFoundException(String message) {
super(message);
}

public FileNotFoundException(String message, Throwable e) {
super(message, e);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.test;

import java.io.File;

import com.google.common.collect.Lists;
import com.google.common.io.Files;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.handler.api.ClientReadHandler;
import org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler;
import org.apache.uniffle.storage.handler.impl.MemoryQuorumClientReadHandler;
import org.apache.uniffle.storage.util.StorageType;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

public class ShuffleServerWithLocalOfExceptionTest extends ShuffleReadWriteBase {

private ShuffleServerGrpcClient shuffleServerClient;
private static String REMOTE_STORAGE = HDFS_URI + "rss/test";

@BeforeAll
public static void setupServers() throws Exception {
CoordinatorConf coordinatorConf = getCoordinatorConf();
createCoordinatorServer(coordinatorConf);

ShuffleServerConf shuffleServerConf = getShuffleServerConf();
File tmpDir = Files.createTempDir();
File dataDir1 = new File(tmpDir, "data1");
File dataDir2 = new File(tmpDir, "data2");
String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath();
shuffleServerConf.setString("rss.storage.type", StorageType.LOCALFILE.name());
shuffleServerConf.setString("rss.storage.basePath", basePath);
shuffleServerConf.setString("rss.server.app.expired.withoutHeartbeat", "5000");
createShuffleServer(shuffleServerConf);

startServers();
}

@BeforeEach
public void createClient() {
shuffleServerClient = new ShuffleServerGrpcClient(LOCALHOST, SHUFFLE_SERVER_PORT);
}

@AfterEach
public void closeClient() {
shuffleServerClient.close();
}

@Test
public void testReadWhenConnectionFailedShouldThrowException() throws Exception {
String testAppId = "testReadWhenException";
int shuffleId = 0;
int partitionId = 0;

MemoryQuorumClientReadHandler memoryQuorumClientReadHandler = new MemoryQuorumClientReadHandler(
testAppId, shuffleId, partitionId, 150, Lists.newArrayList(shuffleServerClient));
ClientReadHandler[] handlers = new ClientReadHandler[1];
handlers[0] = memoryQuorumClientReadHandler;
ComposedClientReadHandler composedClientReadHandler = new ComposedClientReadHandler(handlers);
shuffleServers.get(0).stopServer();
try {
ShuffleDataResult sdr = composedClientReadHandler.readShuffleData();
fail("Should throw connection exception directly.");
} catch (RssException rssException) {
assertTrue(rssException.getMessage().contains("Failed to read shuffle data from HOT handler"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

public class ShuffleWithRssClientTest extends ShuffleReadWriteBase {

Expand Down Expand Up @@ -232,13 +232,7 @@ public void writeReadTest() throws Exception {
ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.LOCALFILE.name(), testAppId, 0, 0, 100, 1,
10, 1000, "", blockIdBitmap, taskIdBitmap,
Lists.newArrayList(shuffleServerInfo1, shuffleServerInfo2), null, new DefaultIdHelper());

try {
readClient.readShuffleBlockData();
fail(EXPECTED_EXCEPTION_MESSAGE);
} catch (Exception e) {
assertTrue(e.getMessage().contains("Failed to read all replicas for"));
}
assertNull(readClient.readShuffleBlockData());
readClient.close();

// send 2nd commit, data will be persisted to disk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,25 +142,20 @@ public void readTest3() throws Exception {

Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
List<ShuffleBlockInfo> blocks = createShuffleBlockList(
0, 0, 0, 2, 30, blockIdBitmap, expectedData, mockSSI);
sendTestData(testAppId, blocks);

ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.LOCALFILE.name(),
testAppId, 0, 0, 100, 1, 10, 1000,
"", blockIdBitmap, taskIdBitmap, shuffleServerInfo, null, new DefaultIdHelper());
FileUtils.deleteDirectory(new File(DATA_DIR1.getAbsolutePath() + "/" + testAppId + "/0/0-0"));
FileUtils.deleteDirectory(new File(DATA_DIR2.getAbsolutePath() + "/" + testAppId + "/0/0-0"));
// sleep to wait delete operation
Thread.sleep(2000);

try {
readClient.readShuffleBlockData();
fail(EXPECTED_EXCEPTION_MESSAGE);
} catch (Exception e) {
assertTrue(e.getMessage().contains("Failed to read all replicas for"));
}
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.LOCALFILE.name(),
testAppId, 0, 0, 100, 1, 10, 1000,
"", blockIdBitmap, taskIdBitmap, shuffleServerInfo, null, new DefaultIdHelper());
assertNull(readClient.readShuffleBlockData());
readClient.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.ShufflePartitionedData;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.exception.FileNotFoundException;
import org.apache.uniffle.proto.RssProtos;
import org.apache.uniffle.proto.RssProtos.AppHeartBeatRequest;
import org.apache.uniffle.proto.RssProtos.AppHeartBeatResponse;
Expand Down Expand Up @@ -533,6 +534,12 @@ public void getLocalShuffleIndex(GetLocalShuffleIndexRequest request,

builder.setIndexData(UnsafeByteOperations.unsafeWrap(data));
reply = builder.build();
} catch (FileNotFoundException indexFileNotFoundException) {
LOG.warn("Index file for {} is not found, maybe the data has been flushed to cold storage.",
requestInfo, indexFileNotFoundException);
reply = GetLocalShuffleIndexResponse.newBuilder()
.setStatus(valueOf(status))
.build();
} catch (Exception e) {
status = StatusCode.INTERNAL_ERROR;
msg = "Error happened when get shuffle index for " + requestInfo + ", " + e.getMessage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@

import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.storage.handler.api.ClientReadHandler;


public class ComposedClientReadHandler implements ClientReadHandler {

private static final Logger LOG = LoggerFactory.getLogger(ComposedClientReadHandler.class);
Expand Down Expand Up @@ -127,7 +127,7 @@ public ShuffleDataResult readShuffleData() {
return null;
}
} catch (Exception e) {
LOG.error("Failed to read shuffle data from " + getCurrentHandlerName() + " handler", e);
throw new RssException("Failed to read shuffle data from " + getCurrentHandlerName() + " handler", e);
}
// when is no data for current handler, and the upmostLevel is not reached,
// then try next one if there has
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShuffleIndexResult;
import org.apache.uniffle.common.exception.FileNotFoundException;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
import org.apache.uniffle.storage.handler.api.ServerReadHandler;
Expand Down Expand Up @@ -80,7 +81,7 @@ private void prepareFilePath(
File baseFolder = new File(fullShufflePath);
if (!baseFolder.exists()) {
// the partition doesn't exist in this base folder, skip
throw new RuntimeException("Can't find folder " + fullShufflePath);
throw new FileNotFoundException("Can't find folder " + fullShufflePath);
}
File[] indexFiles;
String failedGetIndexFileMsg = "No index file found in " + storageBasePath;
Expand All @@ -93,7 +94,7 @@ public boolean accept(File dir, String name) {
}
});
} catch (Exception e) {
throw new RuntimeException(failedGetIndexFileMsg, e);
throw new FileNotFoundException(failedGetIndexFileMsg, e);
}

if (indexFiles != null && indexFiles.length > 0) {
Expand Down

0 comments on commit 54ddca6

Please sign in to comment.