Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
zuston committed Nov 14, 2022
1 parent d63f7d5 commit 56a0026
Showing 1 changed file with 15 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@
import java.util.List;

import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataSegment;
import org.apache.uniffle.common.ShuffleIndexResult;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.Constants;

public class FixedSizeSegmentSplitter implements SegmentSplitter {
private static final Logger LOGGER = LoggerFactory.getLogger(FixedSizeSegmentSplitter.class);

private int readBufferSize;

Expand Down Expand Up @@ -58,12 +62,13 @@ private static List<ShuffleDataSegment> transIndexDataToSegments(byte[] indexDat

while (byteBuffer.hasRemaining()) {
try {
long offset = byteBuffer.getLong();
int length = byteBuffer.getInt();
int uncompressLength = byteBuffer.getInt();
long crc = byteBuffer.getLong();
long blockId = byteBuffer.getLong();
long taskAttemptId = byteBuffer.getLong();
final long offset = byteBuffer.getLong();
final int length = byteBuffer.getInt();
final int uncompressLength = byteBuffer.getInt();
final long crc = byteBuffer.getLong();
final long blockId = byteBuffer.getLong();
final long taskAttemptId = byteBuffer.getLong();

// The index file is written, read and parsed sequentially, so these parsed index segments
// index a continuous shuffle data in the corresponding data file and the first segment's
// offset field is the offset of these shuffle data in the data file.
Expand All @@ -76,6 +81,10 @@ private static List<ShuffleDataSegment> transIndexDataToSegments(byte[] indexDat
// If ShuffleServer is flushing the file at this time, the length in the index file record may be greater
// than the length in the actual data file, and it needs to be returned at this time to avoid EOFException
if (dataFileLen != -1 && totalLength > dataFileLen) {
long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
LOGGER.warn("Abort inconsistent data, the data length: {}(bytes) recorded in index file is greater than "
+ "the real data file length: {}(bytes). Partition id: {}. This should not happen.",
totalLength, dataFileLen, Math.toIntExact((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask));
break;
}

Expand Down

0 comments on commit 56a0026

Please sign in to comment.