Skip to content

Commit

Permalink
Iteration
Browse files Browse the repository at this point in the history
  • Loading branch information
jasontedor committed Jan 17, 2017
1 parent c0169c2 commit 999ca91
Showing 1 changed file with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,17 @@ public RecoveryResponse recoverToTarget() throws IOException {
boolean isTranslogReadyForSequenceNumberBasedRecovery(final Translog.View translogView) throws IOException {
final long startingSeqNo = request.startingSeqNo();
final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
if (startingSeqNo <= endingSeqNo) {
// we need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all
// operations in the required range will be available for replaying from the translog of the source
logger.trace("starting: [{}], ending: [{}}", startingSeqNo, endingSeqNo);
if (startingSeqNo - 1 <= endingSeqNo) {
logger.trace(
"{} waiting for all operations in the range [{}, {}] to complete",
shard.shardId(),
startingSeqNo,
endingSeqNo);
/*
* We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all
* operations in the required range will be available for replaying from the translog of the source.
*/
cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo));

final LocalCheckpointTracker tracker = new LocalCheckpointTracker(shard.indexSettings(), startingSeqNo, startingSeqNo - 1);
Expand All @@ -218,7 +221,7 @@ boolean isTranslogReadyForSequenceNumberBasedRecovery(final Translog.View transl
}
return tracker.getCheckpoint() >= endingSeqNo;
}
return true;
return false;
}

/**
Expand Down Expand Up @@ -492,7 +495,8 @@ protected int sendSnapshot(final long startingSeqNo, final Translog.Snapshot sna
throw new IndexShardClosedException(request.shardId());
}
cancellableThreads.checkForCancel();
if (operation.seqNo() < startingSeqNo) continue;
// we have to send older ops for which no sequence number was assigned, and any ops after the starting sequence number
if (operation.seqNo() == SequenceNumbersService.UNASSIGNED_SEQ_NO || operation.seqNo() < startingSeqNo) continue;
operations.add(operation);
ops++;
size += operation.estimateSize();
Expand Down

0 comments on commit 999ca91

Please sign in to comment.