Skip to content

Commit

Permalink
Read chunk aligned series compaction executor (apache#11745)
Browse files Browse the repository at this point in the history
  • Loading branch information
shuwenwei authored Jan 17, 2024
1 parent 22f915f commit da79077
Show file tree
Hide file tree
Showing 16 changed files with 1,632 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionPathUtils;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.MultiTsFileDeviceIterator;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.readchunk.AlignedSeriesCompactionExecutor;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.readchunk.ReadChunkAlignedSeriesCompactionExecutor;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.readchunk.SingleSeriesCompactionExecutor;
import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileWriter;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
Expand Down Expand Up @@ -129,10 +129,16 @@ private void compactAlignedSeries(
return;
}
writer.startChunkGroup(device);
AlignedSeriesCompactionExecutor compactionExecutor =
new AlignedSeriesCompactionExecutor(
ReadChunkAlignedSeriesCompactionExecutor compactionExecutor =
new ReadChunkAlignedSeriesCompactionExecutor(
device, targetResource, readerAndChunkMetadataList, writer, summary);
compactionExecutor.execute();
for (ChunkMetadata chunkMetadata : writer.getChunkMetadataListOfCurrentDeviceInMemory()) {
if (chunkMetadata.getMeasurementUid().isEmpty()) {
targetResource.updateStartTime(device, chunkMetadata.getStartTime());
targetResource.updateEndTime(device, chunkMetadata.getEndTime());
}
}
writer.endChunkGroup();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ public class CompactionTaskSummary {
protected int processChunkNum = 0;
protected int directlyFlushChunkNum = 0;
protected int deserializeChunkCount = 0;
protected int directlyFlushPageCount = 0;
protected int deserializePageCount = 0;
protected int mergedChunkNum = 0;
protected long processPointNum = 0;
protected long rewritePointNum = 0;
protected long temporalFileSize = 0;
protected int temporalFileNum = 0;

Expand Down Expand Up @@ -87,10 +89,22 @@ public void increaseDeserializedChunkNum(int increment) {
deserializeChunkCount += increment;
}

public void increaseDirectlyFlushPageNum(int increment) {
directlyFlushPageCount += increment;
}

public void increaseDeserializedPageNum(int increment) {
deserializePageCount += increment;
}

public void increaseProcessPointNum(long increment) {
processPointNum += increment;
}

public void increaseRewritePointNum(long increment) {
rewritePointNum += increment;
}

public void increaseMergedChunkNum(int increment) {
this.mergedChunkNum += increment;
}
Expand All @@ -103,10 +117,22 @@ public void setDeserializeChunkCount(int deserializeChunkCount) {
this.deserializeChunkCount = deserializeChunkCount;
}

public void setDirectlyFlushPageCount(int directlyFlushPageCount) {
this.directlyFlushPageCount = directlyFlushPageCount;
}

public void setDeserializePageCount(int deserializePageCount) {
this.deserializePageCount = deserializePageCount;
}

public void setProcessPointNum(int processPointNum) {
this.processPointNum = processPointNum;
}

public void setRewritePointNum(long rewritePointNum) {
this.rewritePointNum = rewritePointNum;
}

public int getProcessChunkNum() {
return processChunkNum;
}
Expand All @@ -119,6 +145,14 @@ public int getDeserializeChunkCount() {
return deserializeChunkCount;
}

public int getDirectlyFlushPageCount() {
return directlyFlushPageCount;
}

public int getDeserializePageCount() {
return deserializePageCount;
}

public int getMergedChunkNum() {
return mergedChunkNum;
}
Expand All @@ -127,6 +161,10 @@ public long getProcessPointNum() {
return processPointNum;
}

public long getRewritePointNum() {
return rewritePointNum;
}

enum Status {
NOT_STARTED,
STARTED,
Expand Down Expand Up @@ -157,12 +195,14 @@ public String toString() {
return String.format(
"Task start time: %s, total process chunk num: %d, "
+ "directly flush chunk num: %d, merge chunk num: %d, deserialize chunk num: %d,"
+ " total process point num: %d",
+ " directly flush page num: %d, total process point num: %d, rewrite point num: %d",
startTimeInStr,
processChunkNum,
directlyFlushChunkNum,
mergedChunkNum,
deserializeChunkCount,
processPointNum);
directlyFlushPageCount,
processPointNum,
rewritePointNum);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor;

public enum ModifiedStatus {
ALL_DELETED,
PARTIAL_DELETED,
NONE_DELETED;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionPathUtils;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.ModifiedStatus;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.AlignedPageElement;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.ChunkMetadataElement;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.FileElement;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionPathUtils;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.ModifiedStatus;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.ChunkMetadataElement;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.FileElement;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.NonAlignedPageElement;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.ModifiedStatus;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.AlignedPageElement;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.ChunkMetadataElement;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.FileElement;
Expand All @@ -49,11 +50,6 @@
import java.util.PriorityQueue;

public abstract class SeriesCompactionExecutor {
protected enum ModifiedStatus {
ALL_DELETED,
PARTIAL_DELETED,
NONE_DELETED;
}

@FunctionalInterface
public interface RemovePage {
Expand Down
Loading

0 comments on commit da79077

Please sign in to comment.