From a1eb03218626a976a364b180fe973846f0c1f5c9 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Thu, 2 Feb 2023 15:15:26 +0800 Subject: [PATCH 1/6] [Improvement] Only report to the shuffle servers that owns the blocks --- .../client/impl/ShuffleWriteClientImpl.java | 18 +++++-- .../test/ShuffleWithRssClientTest.java | 51 +++++++++++++++++++ 2 files changed, 65 insertions(+), 4 deletions(-) diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java index 25f4a1012a..06de328b0f 100644 --- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java +++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java @@ -520,18 +520,28 @@ public void reportShuffleResult( Map> groupedPartitions = Maps.newHashMap(); Map partitionReportTracker = Maps.newHashMap(); for (Map.Entry> entry : partitionToServers.entrySet()) { + int partitionIdx = entry.getKey(); for (ShuffleServerInfo ssi : entry.getValue()) { if (!groupedPartitions.containsKey(ssi)) { - groupedPartitions.putIfAbsent(ssi, Lists.newArrayList()); + groupedPartitions.put(ssi, Lists.newArrayList()); } - groupedPartitions.get(ssi).add(entry.getKey()); + groupedPartitions.get(ssi).add(partitionIdx); + } + if (CollectionUtils.isNotEmpty(partitionToBlockIds.getOrDefault(partitionIdx, null))) { + partitionReportTracker.putIfAbsent(partitionIdx, 0); } - partitionReportTracker.putIfAbsent(entry.getKey(), 0); } + for (Map.Entry> entry : groupedPartitions.entrySet()) { Map> requestBlockIds = Maps.newHashMap(); for (Integer partitionId : entry.getValue()) { - requestBlockIds.put(partitionId, partitionToBlockIds.get(partitionId)); + List partitions = partitionToBlockIds.get(partitionId); + if (CollectionUtils.isNotEmpty(partitions)) { + requestBlockIds.put(partitionId, partitions); + } + } + if (requestBlockIds.isEmpty()) { + continue; } RssReportShuffleResultRequest request = new RssReportShuffleResultRequest( appId, shuffleId, taskAttemptId, requestBlockIds, bitmapNum); diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java index 0a7e122752..9994828133 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java @@ -150,6 +150,57 @@ public void rpcFailTest() throws Exception { assertEquals(blockIdBitmap, report); } + @Test + public void reportBlocksToShuffleServerIfNecessary() { + String testAppId = "reportBlocksToShuffleServerIfNecessary_appId"; + + shuffleWriteClientImpl.registerShuffle( + shuffleServerInfo1, + testAppId, + 1, + Lists.newArrayList(new PartitionRange(1, 1)), + new RemoteStorageInfo(""), + ShuffleDataDistributionType.NORMAL + ); + + shuffleWriteClientImpl.registerShuffle( + shuffleServerInfo2, + testAppId, + 1, + Lists.newArrayList(new PartitionRange(2, 2)), + new RemoteStorageInfo(""), + ShuffleDataDistributionType.NORMAL + ); + + Map> partitionToServers = Maps.newHashMap(); + partitionToServers.putIfAbsent(1, Lists.newArrayList(shuffleServerInfo1)); + partitionToServers.putIfAbsent(2, Lists.newArrayList(shuffleServerInfo2)); + Map> partitionToBlocks = Maps.newHashMap(); + List blockIds = Lists.newArrayList(); + + int partitionIdx = 1; + for (int i = 0; i < 5; i++) { + blockIds.add(ClientUtils.getBlockId(partitionIdx, 0, i)); + } + partitionToBlocks.put(partitionIdx, blockIds); + + // case1 + shuffleWriteClientImpl + .reportShuffleResult(partitionToServers, testAppId, 1, 0, partitionToBlocks, 1); + Roaring64NavigableMap bitmap = shuffleWriteClientImpl + .getShuffleResult("GRPC", Sets.newHashSet(shuffleServerInfo1), testAppId, + 1, 0); + assertTrue(bitmap.isEmpty()); + + bitmap = shuffleWriteClientImpl + .getShuffleResult("GRPC", Sets.newHashSet(shuffleServerInfo1), testAppId, + 1, partitionIdx); + assertEquals(5, bitmap.getLongCardinality()); + for (int i = 0; i < 5; i++) { + assertTrue(bitmap.contains(partitionToBlocks.get(1).get(i))); + } + } + @Test public void reportMultipleServerTest() throws Exception { String testAppId = "reportMultipleServerTest"; From a094464e75851ed6a80052775e8c8f940cba7711 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Thu, 2 Feb 2023 16:04:52 +0800 Subject: [PATCH 2/6] fix --- .../org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java index 06de328b0f..05feaf6993 100644 --- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java +++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java @@ -551,7 +551,7 @@ public void reportShuffleResult( if (response.getStatusCode() == ResponseStatusCode.SUCCESS) { LOG.info("Report shuffle result to " + ssi + " for appId[" + appId + "], shuffleId[" + shuffleId + "] successfully"); - for (Integer partitionId : entry.getValue()) { + for (Integer partitionId : requestBlockIds.keySet()) { partitionReportTracker.put(partitionId, partitionReportTracker.get(partitionId) + 1); } } else { From 7f4eef3f721f38d9fbd7f04891c70e83b4237d90 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Fri, 3 Feb 2023 10:07:07 +0800 Subject: [PATCH 3/6] fix again --- .../org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java index 05feaf6993..9f16f4c935 100644 --- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java +++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java @@ -527,7 +527,7 @@ public void reportShuffleResult( } groupedPartitions.get(ssi).add(partitionIdx); } - if (CollectionUtils.isNotEmpty(partitionToBlockIds.getOrDefault(partitionIdx, null))) { + if (CollectionUtils.isNotEmpty(partitionToBlockIds.get(partitionIdx))) { partitionReportTracker.putIfAbsent(partitionIdx, 0); } } From 58ff4d1cf9dcef2986c6793c5d5bb393f93b2e79 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Fri, 3 Feb 2023 10:26:45 +0800 Subject: [PATCH 4/6] fix --- .../org/apache/uniffle/test/ShuffleWithRssClientTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java index 9994828133..440adfbbab 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java @@ -173,8 +173,8 @@ public void reportBlocksToShuffleServerIfNecessary() { ); Map> partitionToServers = Maps.newHashMap(); - partitionToServers.putIfAbsent(1, Lists.newArrayList(shuffleServerInfo1)); - partitionToServers.putIfAbsent(2, Lists.newArrayList(shuffleServerInfo2)); + partitionToServers.put(1, Lists.newArrayList(shuffleServerInfo1)); + partitionToServers.put(2, Lists.newArrayList(shuffleServerInfo2)); Map> partitionToBlocks = Maps.newHashMap(); List blockIds = Lists.newArrayList(); From 702e354641e13709ae25cfd44005e10c2ae34823 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Fri, 3 Feb 2023 15:54:16 +0800 Subject: [PATCH 5/6] optimize --- .../uniffle/client/impl/ShuffleWriteClientImpl.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java index 9f16f4c935..ce7b4fb06f 100644 --- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java +++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java @@ -522,9 +522,7 @@ public void reportShuffleResult( for (Map.Entry> entry : partitionToServers.entrySet()) { int partitionIdx = entry.getKey(); for (ShuffleServerInfo ssi : entry.getValue()) { - if (!groupedPartitions.containsKey(ssi)) { - groupedPartitions.put(ssi, Lists.newArrayList()); - } + groupedPartitions.putIfAbsent(ssi, Lists.newArrayList()); groupedPartitions.get(ssi).add(partitionIdx); } if (CollectionUtils.isNotEmpty(partitionToBlockIds.get(partitionIdx))) { @@ -535,9 +533,9 @@ public void reportShuffleResult( for (Map.Entry> entry : groupedPartitions.entrySet()) { Map> requestBlockIds = Maps.newHashMap(); for (Integer partitionId : entry.getValue()) { - List partitions = partitionToBlockIds.get(partitionId); - if (CollectionUtils.isNotEmpty(partitions)) { - requestBlockIds.put(partitionId, partitions); + List blockIds = partitionToBlockIds.get(partitionId); + if (CollectionUtils.isNotEmpty(blockIds)) { + requestBlockIds.put(partitionId, blockIds); } } if (requestBlockIds.isEmpty()) { From 174e0b20f666f305211f652d49a6d20a3c413c86 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Fri, 3 Feb 2023 16:03:09 +0800 Subject: [PATCH 6/6] revert --- .../apache/uniffle/client/impl/ShuffleWriteClientImpl.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java index ce7b4fb06f..5387708865 100644 --- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java +++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java @@ -522,7 +522,9 @@ public void reportShuffleResult( for (Map.Entry> entry : partitionToServers.entrySet()) { int partitionIdx = entry.getKey(); for (ShuffleServerInfo ssi : entry.getValue()) { - groupedPartitions.putIfAbsent(ssi, Lists.newArrayList()); + if (!groupedPartitions.containsKey(ssi)) { + groupedPartitions.put(ssi, Lists.newArrayList()); + } groupedPartitions.get(ssi).add(partitionIdx); } if (CollectionUtils.isNotEmpty(partitionToBlockIds.get(partitionIdx))) {