diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java index 4a420cdf6d76..aafc405ee18a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -322,9 +322,12 @@ public synchronized void start() { tsFileManager.getTsFileList(true).stream() .filter( resource -> - // Some resource may be not closed due to the control of + // Some resource may not be closed due to the control of // PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore them. resource.isClosed() + // Some different tsFiles may share the same max progressIndex, thus + // tsFiles with an "equals" max progressIndex must be transmitted to + // avoid data loss && !startIndex.isAfter(resource.getMaxProgressIndexAfterClose()) && isTsFileResourceOverlappedWithTimeRange(resource) && isTsFileGeneratedAfterExtractionTimeLowerBound(resource)) @@ -335,9 +338,12 @@ && isTsFileGeneratedAfterExtractionTimeLowerBound(resource)) tsFileManager.getTsFileList(false).stream() .filter( resource -> - // Some resource may be not closed due to the control of + // Some resource may not be closed due to the control of // PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore them. resource.isClosed() + // Some different tsFiles may share the same max progressIndex, thus + // tsFiles with an "equals" max progressIndex must be transmitted to + // avoid data loss && !startIndex.isAfter(resource.getMaxProgressIndexAfterClose()) && isTsFileResourceOverlappedWithTimeRange(resource) && isTsFileGeneratedAfterExtractionTimeLowerBound(resource))