Skip to content

Commit

Permalink
fix PointPriorityReader calculation of aligned series null value num (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
shuwenwei authored Jan 22, 2024
1 parent 85e71a4 commit 8885df3
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ private void fillAlignedNullValue() {
pointElementsWithSameTimestamp.add(pointQueue.poll());

TsPrimitiveType[] currentValues = currentPoint.getValue().getVector();
int nullValueNum = currentValues.length;
while (!pointQueue.isEmpty()) {
int nullValueNum = currentValues.length;
if (pointQueue.peek().timestamp > lastTime) {
// the smallest time of all pages is later than the last time, then break the loop
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.storageengine.dataregion.compaction;

import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.MeasurementPath;
Expand All @@ -30,14 +31,27 @@
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.reader.IDataBlockReader;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.reader.SeriesDataBlockReader;
import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionFileGeneratorUtils;
import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter;
import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.read.reader.chunk.AlignedChunkReader;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.TsFileGeneratorUtils;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
Expand All @@ -51,6 +65,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -1886,4 +1901,97 @@ public void testAlignedUnSeqInnerSpaceCompactionWithSameTimeseries() throws Exce
}
}
}

@Test
public void testMergeAlignedSeriesTimeValuePairFromDifferentFiles()
throws IOException, IllegalPathException {
TsFileResource resource1 = createEmptyFileAndResource(false);
try (CompactionTestFileWriter writer = new CompactionTestFileWriter(resource1)) {
writer.startChunkGroup("d1");
writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
Arrays.asList("s1", "s2", "s3", "s4", "s5"),
new TimeRange[] {new TimeRange(10, 20)},
TSEncoding.PLAIN,
CompressionType.LZ4,
Arrays.asList(true, true, true, false, false));
writer.endChunkGroup();
writer.endFile();
}
TsFileResource resource2 = createEmptyFileAndResource(false);
try (CompactionTestFileWriter writer = new CompactionTestFileWriter(resource2)) {
writer.startChunkGroup("d1");
writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
Arrays.asList("s1", "s2", "s3", "s4", "s5"),
new TimeRange[] {new TimeRange(10, 20)},
TSEncoding.PLAIN,
CompressionType.LZ4,
Arrays.asList(true, true, false, true, true));
writer.endChunkGroup();
writer.endFile();
}
TsFileResource resource3 = createEmptyFileAndResource(false);
try (CompactionTestFileWriter writer = new CompactionTestFileWriter(resource3)) {
writer.startChunkGroup("d1");
writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
Arrays.asList("s1", "s2", "s3", "s4", "s5"),
new TimeRange[] {new TimeRange(10, 20)},
TSEncoding.PLAIN,
CompressionType.LZ4,
Arrays.asList(false, true, true, true, true));
writer.endChunkGroup();
writer.endFile();
}
TsFileResource resource4 = createEmptyFileAndResource(false);
try (CompactionTestFileWriter writer = new CompactionTestFileWriter(resource4)) {
writer.startChunkGroup("d1");
writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
Arrays.asList("s1", "s2", "s3", "s4", "s5"),
new TimeRange[] {new TimeRange(10, 20)},
TSEncoding.PLAIN,
CompressionType.LZ4,
Arrays.asList(true, false, true, true, true));
writer.endChunkGroup();
writer.endFile();
}

unseqResources.add(resource1);
unseqResources.add(resource2);
unseqResources.add(resource3);
unseqResources.add(resource4);

InnerSpaceCompactionTask task =
new InnerSpaceCompactionTask(
0, tsFileManager, unseqResources, false, new FastCompactionPerformer(false), 0);
Assert.assertTrue(task.start());

TsFileResource targetResource = tsFileManager.getTsFileList(false).get(0);
try (TsFileSequenceReader reader = new TsFileSequenceReader(targetResource.getTsFilePath())) {
List<AlignedChunkMetadata> chunkMetadataList =
reader.getAlignedChunkMetadata("root.testsg.d1");
for (AlignedChunkMetadata alignedChunkMetadata : chunkMetadataList) {
ChunkMetadata timeChunkMetadata =
(ChunkMetadata) alignedChunkMetadata.getTimeChunkMetadata();
Chunk timeChunk = reader.readMemChunk(timeChunkMetadata);
List<Chunk> valueChunks = new ArrayList<>();
for (IChunkMetadata chunkMetadata : alignedChunkMetadata.getValueChunkMetadataList()) {
Chunk valueChunk = reader.readMemChunk((ChunkMetadata) chunkMetadata);
valueChunks.add(valueChunk);
}
AlignedChunkReader alignedChunkReader =
new AlignedChunkReader(timeChunk, valueChunks, null);
while (alignedChunkReader.hasNextSatisfiedPage()) {
BatchData batchData = alignedChunkReader.nextPageData();
IPointReader pointReader = batchData.getBatchDataIterator();
while (pointReader.hasNextTimeValuePair()) {
TimeValuePair timeValuePair = pointReader.nextTimeValuePair();
for (Object value : timeValuePair.getValues()) {
if (value == null) {
Assert.fail();
}
}
}
}
}
}
}
}

0 comments on commit 8885df3

Please sign in to comment.