diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 132141af72a5a..9c073a21a10f1 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -200,14 +200,17 @@ public RecoveryResponse recoverToTarget() throws IOException { boolean isTranslogReadyForSequenceNumberBasedRecovery(final Translog.View translogView) throws IOException { final long startingSeqNo = request.startingSeqNo(); final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); - if (startingSeqNo <= endingSeqNo) { - // we need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all - // operations in the required range will be available for replaying from the translog of the source + logger.trace("starting: [{}], ending: [{}}", startingSeqNo, endingSeqNo); + if (startingSeqNo - 1 <= endingSeqNo) { logger.trace( "{} waiting for all operations in the range [{}, {}] to complete", shard.shardId(), startingSeqNo, endingSeqNo); + /* + * We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all + * operations in the required range will be available for replaying from the translog of the source. + */ cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo)); final LocalCheckpointTracker tracker = new LocalCheckpointTracker(shard.indexSettings(), startingSeqNo, startingSeqNo - 1); @@ -218,7 +221,7 @@ boolean isTranslogReadyForSequenceNumberBasedRecovery(final Translog.View transl } return tracker.getCheckpoint() >= endingSeqNo; } - return true; + return false; } /** @@ -492,7 +495,8 @@ protected int sendSnapshot(final long startingSeqNo, final Translog.Snapshot sna throw new IndexShardClosedException(request.shardId()); } cancellableThreads.checkForCancel(); - if (operation.seqNo() < startingSeqNo) continue; + // we have to send older ops for which no sequence number was assigned, and any ops after the starting sequence number + if (operation.seqNo() == SequenceNumbersService.UNASSIGNED_SEQ_NO || operation.seqNo() < startingSeqNo) continue; operations.add(operation); ops++; size += operation.estimateSize();