Skip to content

Commit

Permalink
Pipe: fixed the bug that region follower tsfiles can not record the I…
Browse files Browse the repository at this point in the history
…oTProgressIndex from region leader (apache#11980)
  • Loading branch information
Caideyipi committed Feb 5, 2024
1 parent eab1769 commit bc57e45
Showing 1 changed file with 8 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,12 @@ public synchronized void start() {
tsFileManager.getTsFileList(true).stream()
.filter(
resource ->
// Some resource may be not closed due to the control of
// Some resource may not be closed due to the control of
// PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore them.
resource.isClosed()
// Some different tsFiles may share the same max progressIndex, thus
// tsFiles with an "equals" max progressIndex must be transmitted to
// avoid data loss
&& !startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
&& isTsFileResourceOverlappedWithTimeRange(resource)
&& isTsFileGeneratedAfterExtractionTimeLowerBound(resource))
Expand All @@ -335,9 +338,12 @@ && isTsFileGeneratedAfterExtractionTimeLowerBound(resource))
tsFileManager.getTsFileList(false).stream()
.filter(
resource ->
// Some resource may be not closed due to the control of
// Some resource may not be closed due to the control of
// PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore them.
resource.isClosed()
// Some different tsFiles may share the same max progressIndex, thus
// tsFiles with an "equals" max progressIndex must be transmitted to
// avoid data loss
&& !startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
&& isTsFileResourceOverlappedWithTimeRange(resource)
&& isTsFileGeneratedAfterExtractionTimeLowerBound(resource))
Expand Down

0 comments on commit bc57e45

Please sign in to comment.