Skip to content

Commit

Permalink
[IOTDB-6176] Pipe: record SimpleProgressIndex for insertion when usin…
Browse files Browse the repository at this point in the history
…g SimpleConsensus to avoid wrong advance progress report (#11255)
  • Loading branch information
Caideyipi authored Oct 10, 2023
1 parent 18a4d91 commit 7c976f1
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.resource.PipeHardlinkFileDirStartupCleaner;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.service.ResourcesInformationHolder;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;

Expand Down Expand Up @@ -95,8 +96,8 @@ public ServiceType getID() {

////////////////////// SimpleConsensus ProgressIndex Assigner //////////////////////

public void assignSimpleProgressIndexIfNeeded(TsFileResource tsFileResource) {
simpleConsensusProgressIndexAssigner.assignIfNeeded(tsFileResource);
public void assignSimpleProgressIndexIfNeeded(InsertNode insertNode) {
simpleConsensusProgressIndexAssigner.assignIfNeeded(insertNode);
}

////////////////////// Recover ProgressIndex Assigner //////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;

import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -54,7 +54,7 @@ public class SimpleConsensusProgressIndexAssigner {
private boolean isSimpleConsensusEnable = false;

private int rebootTimes = 0;
private final AtomicLong memtableFlushOrderId = new AtomicLong(0);
private final AtomicLong insertionRequestId = new AtomicLong(0);

public void start() throws StartupException {
isSimpleConsensusEnable =
Expand Down Expand Up @@ -98,16 +98,16 @@ private void recordRebootTimes() throws IOException {
FileUtils.writeStringToFile(file, String.valueOf(rebootTimes + 1), StandardCharsets.UTF_8);
}

public void assignIfNeeded(TsFileResource tsFileResource) {
public void assignIfNeeded(InsertNode insertNode) {
if (!isSimpleConsensusEnable) {
return;
}

tsFileResource.updateProgressIndex(
new SimpleProgressIndex(rebootTimes, memtableFlushOrderId.getAndIncrement()));
insertNode.setProgressIndex(
new SimpleProgressIndex(rebootTimes, insertionRequestId.getAndIncrement()));
}

public SimpleProgressIndex getSimpleProgressIndexForTsFileRecovery() {
return new SimpleProgressIndex(rebootTimes, memtableFlushOrderId.getAndIncrement());
return new SimpleProgressIndex(rebootTimes, insertionRequestId.getAndIncrement());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,7 @@ private void extractTsFileInsertion(PipeRealtimeEvent event) {
break;
case USING_TABLET:
// All the tablet events have been extracted, so we can ignore the tsFile event.
// Report this event for SimpleProgressIndex, which does not have progressIndex for wal.
// This report won't affect IoTProgressIndex since the previous wal events have been
// successfully transferred here.
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName(), true);
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName(), false);
break;
default:
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ public void insert(InsertRowNode insertRowNode) throws WriteProcessException {

startTime = System.nanoTime();

PipeAgent.runtime().assignSimpleProgressIndexIfNeeded(insertRowNode);
PipeInsertionDataNodeListener.getInstance()
.listenToInsertNode(
dataRegionInfo.getDataRegion().getDataRegionId(),
Expand Down Expand Up @@ -387,6 +388,7 @@ public void insertTablet(

startTime = System.nanoTime();

PipeAgent.runtime().assignSimpleProgressIndexIfNeeded(insertTabletNode);
PipeInsertionDataNodeListener.getInstance()
.listenToInsertNode(
dataRegionInfo.getDataRegion().getDataRegionId(),
Expand Down Expand Up @@ -896,7 +898,6 @@ public void asyncClose() {
IMemTable tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() : workMemTable;

try {
PipeAgent.runtime().assignSimpleProgressIndexIfNeeded(tsFileResource);
PipeInsertionDataNodeListener.getInstance()
.listenToTsFile(
dataRegionInfo.getDataRegion().getDataRegionId(), tsFileResource, false);
Expand Down

0 comments on commit 7c976f1

Please sign in to comment.