Skip to content

Commit

Permalink
BACKPORT: [apache#1608][part-6][FOLLOWUP] improvement(client): Check …
Browse files Browse the repository at this point in the history
…blockId num after blocks all sent apache#1761

apache#1761
  • Loading branch information
zuston committed Jun 4, 2024
1 parent bd28be9 commit c1eb77d
Showing 1 changed file with 19 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,8 @@ private void writeImpl(Iterator<Product2<K, V>> records) {
long checkStartTs = System.currentTimeMillis();
checkAllBufferSpilled();
checkSentRecordCount(recordCount);
checkBlockSendResult(new HashSet<>(blockIds));
checkSentBlockCount();
checkBlockSendResult(blockIds);
long commitStartTs = System.currentTimeMillis();
long checkDuration = commitStartTs - checkStartTs;
if (!isMemoryShuffleEnabled) {
Expand Down Expand Up @@ -408,17 +408,26 @@ private void checkSentRecordCount(long recordCount) {
}

private void checkSentBlockCount() {
long tracked = 0;
if (serverToPartitionToBlockIds != null) {
Set<Long> blockIds = new HashSet<>();
for (Map<Integer, Set<Long>> partitionBlockIds : serverToPartitionToBlockIds.values()) {
partitionBlockIds.values().forEach(x -> blockIds.addAll(x));
}
tracked = blockIds.size();
long expected = blockIds.size();
long bufferManagerTracked = bufferManager.getBlockCount();

assert serverToPartitionToBlockIds != null;
// to filter the multiple replica's duplicate blockIds
Set<Long> blockIds = new HashSet<>();
for (Map<Integer, Set<Long>> partitionBlockIds : serverToPartitionToBlockIds.values()) {
partitionBlockIds.values().forEach(x -> blockIds.addAll(x));
}
if (tracked != bufferManager.getBlockCount()) {
long serverTracked = blockIds.size();
if (expected != serverTracked || expected != bufferManagerTracked) {
throw new RssSendFailedException(
"Potential block loss may occur when preparing to send blocks for task[" + taskId + "]");
"Potential block loss may occur for task["
+ taskId
+ "]. BlockId number expected: "
+ expected
+ ", serverTracked: "
+ serverTracked
+ ", bufferManagerTracked: "
+ bufferManagerTracked);
}
}

Expand Down

0 comments on commit c1eb77d

Please sign in to comment.