Skip to content

Commit

Permalink
Fix review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
rickyma committed Mar 6, 2024
1 parent 6de2bc9 commit 900d206
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ public void write(Iterator<Product2<K, V>> records) {

private void writeImpl(Iterator<Product2<K, V>> records) {
List<ShuffleBlockInfo> shuffleBlockInfos;
int recordCount = 0;
long recordCount = 0;
while (records.hasNext()) {
recordCount++;
Product2<K, V> record = records.next();
Expand All @@ -266,7 +266,7 @@ private void writeImpl(Iterator<Product2<K, V>> records) {
shuffleBlockInfos = bufferManager.clear();
processShuffleBlockInfos(shuffleBlockInfos);
long s = System.currentTimeMillis();
assert recordCount == bufferManager.getRecordCount();
checkSentRecordCount(recordCount);
checkBlockSendResult(blockIds);
final long checkDuration = System.currentTimeMillis() - s;
long commitDuration = 0;
Expand Down Expand Up @@ -294,6 +294,16 @@ private void writeImpl(Iterator<Product2<K, V>> records) {
+ bufferManager.getManagerCostInfo());
}

private void checkSentRecordCount(long recordCount) {
if (recordCount != bufferManager.getRecordCount()) {
String errorMsg =
"Some records might have been lost when preparing to send blocks for task["
+ taskId
+ "]";
throw new RssSendFailedException(errorMsg);
}
}

/**
* ShuffleBlock will be added to queue and send to shuffle server maintenance the following
* information: 1. add blockId to set, check if it is send later 2. update shuffle server info,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ private void writeImpl(Iterator<Product2<K, V>> records) {
if (isCombine) {
createCombiner = shuffleDependency.aggregator().get().createCombiner();
}
int recordCount = 0;
long recordCount = 0;
while (records.hasNext()) {
recordCount++;
// Task should fast fail when sending data failed
Expand All @@ -287,7 +287,7 @@ private void writeImpl(Iterator<Product2<K, V>> records) {
processShuffleBlockInfos(shuffleBlockInfos);
}
long checkStartTs = System.currentTimeMillis();
assert recordCount == bufferManager.getRecordCount();
checkSentRecordCount(recordCount);
checkBlockSendResult(blockIds);
long commitStartTs = System.currentTimeMillis();
long checkDuration = commitStartTs - checkStartTs;
Expand All @@ -313,6 +313,16 @@ private void writeImpl(Iterator<Product2<K, V>> records) {
+ bufferManager.getManagerCostInfo());
}

private void checkSentRecordCount(long recordCount) {
if (recordCount != bufferManager.getRecordCount()) {
String errorMsg =
"Some records might have been lost when preparing to send blocks for task["
+ taskId
+ "]";
throw new RssSendFailedException(errorMsg);
}
}

// only push-based shuffle use this interface, but rss won't be used when push-based shuffle is
// enabled.
public long[] getPartitionLengths() {
Expand Down

0 comments on commit 900d206

Please sign in to comment.