Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Fix potenial bug when the index reading offset is greater than data length #320

Merged
merged 3 commits into from
Nov 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@
import java.util.List;

import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataSegment;
import org.apache.uniffle.common.ShuffleIndexResult;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.Constants;

public class FixedSizeSegmentSplitter implements SegmentSplitter {
private static final Logger LOGGER = LoggerFactory.getLogger(FixedSizeSegmentSplitter.class);

private int readBufferSize;

Expand Down Expand Up @@ -58,29 +62,35 @@ private static List<ShuffleDataSegment> transIndexDataToSegments(byte[] indexDat

while (byteBuffer.hasRemaining()) {
try {
long offset = byteBuffer.getLong();
int length = byteBuffer.getInt();
int uncompressLength = byteBuffer.getInt();
long crc = byteBuffer.getLong();
long blockId = byteBuffer.getLong();
long taskAttemptId = byteBuffer.getLong();
final long offset = byteBuffer.getLong();
final int length = byteBuffer.getInt();
final int uncompressLength = byteBuffer.getInt();
final long crc = byteBuffer.getLong();
final long blockId = byteBuffer.getLong();
final long taskAttemptId = byteBuffer.getLong();

// The index file is written, read and parsed sequentially, so these parsed index segments
// index a continuous shuffle data in the corresponding data file and the first segment's
// offset field is the offset of these shuffle data in the data file.
if (fileOffset == -1) {
fileOffset = offset;
}

bufferSegments.add(new BufferSegment(blockId, bufferOffset, length, uncompressLength, crc, taskAttemptId));
bufferOffset += length;
totalLength += length;

// If ShuffleServer is flushing the file at this time, the length in the index file record may be greater
// than the length in the actual data file, and it needs to be returned at this time to avoid EOFException
if (dataFileLen != -1 && totalLength >= dataFileLen) {
if (dataFileLen != -1 && totalLength > dataFileLen) {
long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
LOGGER.warn("Abort inconsistent data, the data length: {}(bytes) recorded in index file is greater than "
+ "the real data file length: {}(bytes). Partition id: {}. This should not happen.",
totalLength, dataFileLen, Math.toIntExact((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask));
break;
}

bufferSegments.add(new BufferSegment(blockId, bufferOffset, length, uncompressLength, crc, taskAttemptId));
bufferOffset += length;

if (bufferOffset >= readBufferSize) {
ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments);
dataFileSegments.add(sds);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import org.apache.uniffle.common.ShuffleDataSegment;
import org.apache.uniffle.common.ShuffleIndexResult;
Expand All @@ -33,6 +35,22 @@

public class FixedSizeSegmentSplitterTest {

@ParameterizedTest
@ValueSource(ints = {48, 49, 57})
public void testAvoidEOFException(int dataLength) {
SegmentSplitter splitter = new FixedSizeSegmentSplitter(1000);
byte[] data = generateData(
Pair.of(32, 0),
Pair.of(16, 0),
Pair.of(10, 0)
);

List<ShuffleDataSegment> shuffleDataSegments = splitter.split(new ShuffleIndexResult(data, dataLength));
assertEquals(1, shuffleDataSegments.size());
assertEquals(0, shuffleDataSegments.get(0).getOffset());
assertEquals(48, shuffleDataSegments.get(0).getLength());
}

@Test
public void testSplit() {
SegmentSplitter splitter = new FixedSizeSegmentSplitter(100);
Expand Down