Skip to content

Commit

Permalink
[#1464] improvement(client): Improve the error log message for checkB…
Browse files Browse the repository at this point in the history
…lockSendResult
  • Loading branch information
rickyma committed Jan 17, 2024
1 parent 758a1f1 commit 64cc170
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.Maps;
import scala.Option;
import scala.Tuple2;
import scala.collection.Iterator;
Expand Down Expand Up @@ -658,6 +660,13 @@ public void addFailedBlockIds(String taskId, Set<Long> blockIds) {
taskToFailedBlockIds.get(taskId).addAll(blockIds);
}

@VisibleForTesting
public void addTaskToFailedBlockIdsAndServer(String taskId, Long blockId, ShuffleServerInfo shuffleServerInfo) {
taskToFailedBlockIdsAndServer.putIfAbsent(taskId, Maps.newHashMap());
taskToFailedBlockIdsAndServer.get(taskId).putIfAbsent(blockId, new LinkedBlockingDeque<>());
taskToFailedBlockIdsAndServer.get(taskId).get(blockId).add(shuffleServerInfo);
}

@VisibleForTesting
public void addSuccessBlockIds(String taskId, Set<Long> blockIds) {
if (taskToSuccessBlockIds.get(taskId) == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ public void checkBlockSendResultTest() {
// case 3: partial blocks are sent failed, Runtime exception will be thrown
manager.addSuccessBlockIds(taskId, Sets.newHashSet(1L, 2L));
manager.addFailedBlockIds(taskId, Sets.newHashSet(3L));
ShuffleServerInfo shuffleServerInfo = new ShuffleServerInfo("127.0.0.1", 20001);
manager.addTaskToFailedBlockIdsAndServer(taskId, 3L, shuffleServerInfo);
Throwable e3 =
assertThrows(
RuntimeException.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -85,10 +86,11 @@ public void checkBlockSendResultTest() {
.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "127.0.0.1:12345,127.0.0.1:12346");
Map<String, Set<Long>> failBlocks = JavaUtils.newConcurrentMap();
Map<String, Set<Long>> successBlocks = JavaUtils.newConcurrentMap();
Map<String, Map<Long, BlockingQueue<ShuffleServerInfo>>> taskToFailedBlockIdsAndServer = JavaUtils.newConcurrentMap();
Serializer kryoSerializer = new KryoSerializer(conf);
RssShuffleManager manager =
TestUtils.createShuffleManager(
conf, false, null, successBlocks, failBlocks, JavaUtils.newConcurrentMap());
conf, false, null, successBlocks, failBlocks, taskToFailedBlockIdsAndServer);

ShuffleWriteClient mockShuffleWriteClient = mock(ShuffleWriteClient.class);
Partitioner mockPartitioner = mock(Partitioner.class);
Expand Down Expand Up @@ -149,6 +151,12 @@ public void checkBlockSendResultTest() {
// case 3: partial blocks are sent failed, Runtime exception will be thrown
successBlocks.put("taskId", Sets.newHashSet(1L, 2L));
failBlocks.put("taskId", Sets.newHashSet(3L));
Map<Long, BlockingQueue<ShuffleServerInfo>> blockIdToShuffleServerInfoMap = JavaUtils.newConcurrentMap();
BlockingQueue blockingQueue = new LinkedBlockingQueue<>();
ShuffleServerInfo shuffleServerInfo = new ShuffleServerInfo("127.0.0.1", 20001);
blockingQueue.add(shuffleServerInfo);
blockIdToShuffleServerInfoMap.put(3L, blockingQueue);
taskToFailedBlockIdsAndServer.put("taskId", blockIdToShuffleServerInfoMap);
Throwable e3 =
assertThrows(
RuntimeException.class,
Expand Down

0 comments on commit 64cc170

Please sign in to comment.