From 2115f4ad6b24111423b23fb71f81dda6d5729682 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 28 Jun 2017 14:25:09 -0400 Subject: [PATCH 01/16] Throw back replica local checkpoint on new primary This commit causes a replica to throwback its local checkpoint to the global checkpoint when learning of a new primary through a replica operation. --- .../index/seqno/LocalCheckpointTracker.java | 13 ++ .../index/seqno/SequenceNumbersService.java | 9 ++ .../elasticsearch/index/shard/IndexShard.java | 9 ++ .../seqno/LocalCheckpointTrackerTests.java | 38 ++++-- .../index/shard/IndexShardTests.java | 112 ++++++++++++++---- 5 files changed, 153 insertions(+), 28 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java index a1ad5dfa6ff79..368f136754fb3 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java @@ -121,6 +121,19 @@ public synchronized void markSeqNoAsCompleted(final long seqNo) { } } + /** + * Resets the checkpoint to the specified value. + * + * @param checkpoint the local checkpoint to reset this tracker to + */ + synchronized void resetCheckpoint(final long checkpoint) { + assert checkpoint <= this.checkpoint; + processedSeqNo.clear(); + firstProcessedSeqNo = checkpoint + 1; + nextSeqNo = checkpoint + 1; + this.checkpoint = checkpoint; + } + /** * The current checkpoint which can be advanced by {@link #markSeqNoAsCompleted(long)}. * diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java index 6d8b87599a125..05f5fea2dc252 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java @@ -106,6 +106,15 @@ public void markSeqNoAsCompleted(final long seqNo) { localCheckpointTracker.markSeqNoAsCompleted(seqNo); } + /** + * Resets the local checkpoint to the specified value. + * + * @param localCheckpoint the local checkpoint to reset to + */ + public void resetLocalCheckpoint(final long localCheckpoint) { + localCheckpointTracker.resetCheckpoint(localCheckpoint); + } + /** * The current sequence number stats. * diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index db0f27a28ca75..3b13edbc92543 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2057,6 +2057,15 @@ public void acquireReplicaOperationPermit(final long operationPrimaryTerm, final assert operationPrimaryTerm > primaryTerm : "shard term already update. op term [" + operationPrimaryTerm + "], shardTerm [" + primaryTerm + "]"; primaryTerm = operationPrimaryTerm; + logger.trace( + "detected new primary with primary term [{}], " + + "resetting local checkpoint from [{}] to [{}], " + + "updating global checkpoint to [{}]", + operationPrimaryTerm, + getLocalCheckpoint(), + globalCheckpoint, + globalCheckpoint); + getEngine().seqNoService().resetLocalCheckpoint(globalCheckpoint); updateGlobalCheckpointOnReplica(globalCheckpoint); getEngine().getTranslog().rollGeneration(); }); diff --git a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java index 3d280b4d28c98..2fae97283f04a 100644 --- a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java +++ b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java @@ -38,8 +38,10 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.isOneOf; +import static org.hamcrest.Matchers.not; public class LocalCheckpointTrackerTests extends ESTestCase { @@ -49,14 +51,14 @@ public class LocalCheckpointTrackerTests extends ESTestCase { public static LocalCheckpointTracker createEmptyTracker() { return new LocalCheckpointTracker( - IndexSettingsModule.newIndexSettings( - "test", - Settings - .builder() - .put(LocalCheckpointTracker.SETTINGS_BIT_ARRAYS_SIZE.getKey(), SMALL_CHUNK_SIZE) - .build()), - SequenceNumbersService.NO_OPS_PERFORMED, - SequenceNumbersService.NO_OPS_PERFORMED + IndexSettingsModule.newIndexSettings( + "test", + Settings + .builder() + .put(LocalCheckpointTracker.SETTINGS_BIT_ARRAYS_SIZE.getKey(), SMALL_CHUNK_SIZE) + .build()), + SequenceNumbersService.NO_OPS_PERFORMED, + SequenceNumbersService.NO_OPS_PERFORMED ); } @@ -236,4 +238,24 @@ public void testWaitForOpsToComplete() throws BrokenBarrierException, Interrupte thread.join(); } + + public void testResetCheckpoint() { + final int operations = 1024 - scaledRandomIntBetween(0, 1024); + for (int i = 0; i < operations; i++) { + if (!rarely()) { + tracker.markSeqNoAsCompleted(i); + } + } + + final int localCheckpoint = + randomIntBetween(Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED), Math.toIntExact(tracker.getCheckpoint())); + tracker.resetCheckpoint(localCheckpoint); + assertThat(tracker.getCheckpoint(), equalTo((long) localCheckpoint)); + assertThat(tracker.getMaxSeqNo(), equalTo((long) localCheckpoint)); + assertThat(tracker.processedSeqNo, empty()); + assertThat(tracker.generateSeqNo(), equalTo((long) (localCheckpoint + 1))); + tracker.markSeqNoAsCompleted((long) (localCheckpoint + 1)); + assertThat(tracker.processedSeqNo, not(empty())); + assertThat(tracker.processedSeqNo.peek().get(0), equalTo(true)); + } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 9093274a491cb..8a0925c977b26 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -142,7 +142,6 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; /** @@ -405,26 +404,10 @@ public void testPrimaryFillsSeqNoGapsOnPromotion() throws Exception { // most of the time this is large enough that most of the time there will be at least one gap final int operations = 1024 - scaledRandomIntBetween(0, 1024); - int max = Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED); - boolean gap = false; - for (int i = 0; i < operations; i++) { - if (!rarely()) { - final String id = Integer.toString(i); - SourceToParse sourceToParse = SourceToParse.source(indexShard.shardId().getIndexName(), "test", id, - new BytesArray("{}"), XContentType.JSON); - indexShard.applyIndexOperationOnReplica(i, indexShard.getPrimaryTerm(), - 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse, - getMappingUpdater(indexShard, sourceToParse.type())); - max = i; - } else { - gap = true; - } - } + final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED)); - final int maxSeqNo = max; - if (gap) { - assertThat(indexShard.getLocalCheckpoint(), not(equalTo(maxSeqNo))); - } + final int maxSeqNo = result.maxSeqNo; + final boolean gap = result.gap; // promote the replica final ShardRouting replicaRouting = indexShard.routingEntry(); @@ -637,6 +620,7 @@ public void onFailure(Exception e) { @Override public void onResponse(Releasable releasable) { assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm)); + assertThat(indexShard.getLocalCheckpoint(), equalTo(newGlobalCheckPoint)); assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint)); onResponse.set(true); releasable.close(); @@ -697,6 +681,7 @@ private void finish() { assertTrue(onResponse.get()); assertNull(onFailure.get()); assertThat(indexShard.getTranslog().getGeneration().translogFileGeneration, equalTo(translogGen + 1)); + assertThat(indexShard.getLocalCheckpoint(), equalTo(newGlobalCheckPoint)); assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint)); } } @@ -707,6 +692,44 @@ private void finish() { closeShards(indexShard); } + public void testThrowbackLocalCheckpointOnReplica() throws IOException, InterruptedException { + final IndexShard indexShard = newStartedShard(false); + + // most of the time this is large enough that most of the time there will be at least one gap + final int operations = 1024 - scaledRandomIntBetween(0, 1024); + indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED)); + + final int globalCheckpoint = + randomIntBetween( + Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED), + Math.toIntExact(indexShard.getLocalCheckpoint())); + final CountDownLatch latch = new CountDownLatch(1); + indexShard.acquireReplicaOperationPermit( + indexShard.primaryTerm + 1, + globalCheckpoint, + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + + } + }, + ThreadPool.Names.SAME); + + latch.await(); + assertThat(indexShard.getLocalCheckpoint(), equalTo((long) globalCheckpoint)); + + // ensure that after the local checkpoint throwback and indexing again, the local checkpoint advances + final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(indexShard.getLocalCheckpoint())); + assertThat(indexShard.getLocalCheckpoint(), equalTo((long) result.localCheckpoint)); + + closeShards(indexShard); + } + public void testConcurrentTermIncreaseOnReplicaShard() throws BrokenBarrierException, InterruptedException, IOException { final IndexShard indexShard = newStartedShard(false); @@ -1966,6 +1989,55 @@ public void testReadSnapshotConcurrently() throws IOException, InterruptedExcept closeShards(newShard); } + class Result { + private final int localCheckpoint; + private final int maxSeqNo; + private final boolean gap; + + public Result(final int localCheckpoint, final int maxSeqNo, final boolean gap) { + this.localCheckpoint = localCheckpoint; + this.maxSeqNo = maxSeqNo; + this.gap = gap; + } + } + + /** + * Index on the specified shard while introducing sequence number gaps. + * + * @param indexShard the shard + * @param operations the number of operations + * @param offset the starting sequence number + * @return a pair of the maximum sequence number and whether or not a gap was introduced + * @throws IOException if an I/O exception occurs while indexing on the shard + */ + private Result indexOnReplicaWithGaps( + final IndexShard indexShard, + final int operations, + final int offset) throws IOException { + int localCheckpoint = offset; + int max = offset; + boolean gap = false; + for (int i = offset + 1; i < operations; i++) { + if (!rarely()) { + final String id = Integer.toString(i); + SourceToParse sourceToParse = SourceToParse.source(indexShard.shardId().getIndexName(), "test", id, + new BytesArray("{}"), XContentType.JSON); + indexShard.applyIndexOperationOnReplica(i, indexShard.getPrimaryTerm(), + 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse, + getMappingUpdater(indexShard, sourceToParse.type())); + if (!gap && i == localCheckpoint + 1) { + localCheckpoint++; + } + max = i; + } else { + gap = true; + } + } + assert localCheckpoint == indexShard.getLocalCheckpoint(); + assert !gap || (localCheckpoint != max); + return new Result(localCheckpoint, max, gap); + } + /** A dummy repository for testing which just needs restore overridden */ private abstract static class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository { private final String indexName; From 8f74e926d18d7ac40c735a70a99674ad26103927 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 28 Jun 2017 14:53:10 -0400 Subject: [PATCH 02/16] Checkstyle --- .../java/org/elasticsearch/index/shard/IndexShardTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 8a0925c977b26..5941fbe8b174e 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1994,7 +1994,7 @@ class Result { private final int maxSeqNo; private final boolean gap; - public Result(final int localCheckpoint, final int maxSeqNo, final boolean gap) { + Result(final int localCheckpoint, final int maxSeqNo, final boolean gap) { this.localCheckpoint = localCheckpoint; this.maxSeqNo = maxSeqNo; this.gap = gap; From 5e9d79fda931ef91c8104597493ebb5337f324c9 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 30 Jun 2017 12:32:25 -0400 Subject: [PATCH 03/16] Fix order and beef up test --- .../org/elasticsearch/index/shard/IndexShard.java | 2 +- .../elasticsearch/index/shard/IndexShardTests.java | 12 +++++++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 3b13edbc92543..af4a0d53d8cf0 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2065,8 +2065,8 @@ public void acquireReplicaOperationPermit(final long operationPrimaryTerm, final getLocalCheckpoint(), globalCheckpoint, globalCheckpoint); - getEngine().seqNoService().resetLocalCheckpoint(globalCheckpoint); updateGlobalCheckpointOnReplica(globalCheckpoint); + getEngine().seqNoService().resetLocalCheckpoint(getGlobalCheckpoint()); getEngine().getTranslog().rollGeneration(); }); globalCheckpointUpdated = true; diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 5941fbe8b174e..ec2b7b26996bd 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -692,13 +692,19 @@ private void finish() { closeShards(indexShard); } - public void testThrowbackLocalCheckpointOnReplica() throws IOException, InterruptedException { + public void testThrowBackLocalCheckpointOnReplica() throws IOException, InterruptedException { final IndexShard indexShard = newStartedShard(false); // most of the time this is large enough that most of the time there will be at least one gap final int operations = 1024 - scaledRandomIntBetween(0, 1024); indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED)); + final long globalCheckpointOnReplica = + randomIntBetween( + Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED), + Math.toIntExact(indexShard.getLocalCheckpoint())); + indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica); + final int globalCheckpoint = randomIntBetween( Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED), @@ -721,9 +727,9 @@ public void onFailure(Exception e) { ThreadPool.Names.SAME); latch.await(); - assertThat(indexShard.getLocalCheckpoint(), equalTo((long) globalCheckpoint)); + assertThat(indexShard.getLocalCheckpoint(), equalTo(Math.max(globalCheckpoint, globalCheckpointOnReplica))); - // ensure that after the local checkpoint throwback and indexing again, the local checkpoint advances + // ensure that after the local checkpoint throw back and indexing again, the local checkpoint advances final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(indexShard.getLocalCheckpoint())); assertThat(indexShard.getLocalCheckpoint(), equalTo((long) result.localCheckpoint)); From 1e7cee99eb80de30303930b0d825e31627965007 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 3 Jul 2017 16:02:39 -0400 Subject: [PATCH 04/16] Do not reset max seq no --- .../index/seqno/LocalCheckpointTracker.java | 1 - .../index/seqno/LocalCheckpointTrackerTests.java | 9 ++++----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java index 368f136754fb3..867f2241416b6 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java @@ -130,7 +130,6 @@ synchronized void resetCheckpoint(final long checkpoint) { assert checkpoint <= this.checkpoint; processedSeqNo.clear(); firstProcessedSeqNo = checkpoint + 1; - nextSeqNo = checkpoint + 1; this.checkpoint = checkpoint; } diff --git a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java index 2fae97283f04a..bfbb8c7440e82 100644 --- a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java +++ b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java @@ -241,9 +241,11 @@ public void testWaitForOpsToComplete() throws BrokenBarrierException, Interrupte public void testResetCheckpoint() { final int operations = 1024 - scaledRandomIntBetween(0, 1024); + int maxSeqNo = Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED); for (int i = 0; i < operations; i++) { if (!rarely()) { tracker.markSeqNoAsCompleted(i); + maxSeqNo = i; } } @@ -251,11 +253,8 @@ public void testResetCheckpoint() { randomIntBetween(Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED), Math.toIntExact(tracker.getCheckpoint())); tracker.resetCheckpoint(localCheckpoint); assertThat(tracker.getCheckpoint(), equalTo((long) localCheckpoint)); - assertThat(tracker.getMaxSeqNo(), equalTo((long) localCheckpoint)); + assertThat(tracker.getMaxSeqNo(), equalTo((long) maxSeqNo)); assertThat(tracker.processedSeqNo, empty()); - assertThat(tracker.generateSeqNo(), equalTo((long) (localCheckpoint + 1))); - tracker.markSeqNoAsCompleted((long) (localCheckpoint + 1)); - assertThat(tracker.processedSeqNo, not(empty())); - assertThat(tracker.processedSeqNo.peek().get(0), equalTo(true)); + assertThat(tracker.generateSeqNo(), equalTo((long) (maxSeqNo + 1))); } } From f33925d0fb8ce27da5600cdc78e56562d664ad1b Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 3 Jul 2017 16:05:45 -0400 Subject: [PATCH 05/16] Fix logging statement --- .../java/org/elasticsearch/index/shard/IndexShard.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index af4a0d53d8cf0..81c30cb6e83f9 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2057,15 +2057,12 @@ public void acquireReplicaOperationPermit(final long operationPrimaryTerm, final assert operationPrimaryTerm > primaryTerm : "shard term already update. op term [" + operationPrimaryTerm + "], shardTerm [" + primaryTerm + "]"; primaryTerm = operationPrimaryTerm; + updateGlobalCheckpointOnReplica(globalCheckpoint); logger.trace( - "detected new primary with primary term [{}], " - + "resetting local checkpoint from [{}] to [{}], " - + "updating global checkpoint to [{}]", + "detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]", operationPrimaryTerm, getLocalCheckpoint(), - globalCheckpoint, - globalCheckpoint); - updateGlobalCheckpointOnReplica(globalCheckpoint); + getGlobalCheckpoint()); getEngine().seqNoService().resetLocalCheckpoint(getGlobalCheckpoint()); getEngine().getTranslog().rollGeneration(); }); From cbe568ba9fc0f4e7ae7de11e9c165633e3b81c5a Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 3 Jul 2017 16:34:28 -0400 Subject: [PATCH 06/16] Revert accidental format changes --- .../index/seqno/LocalCheckpointTrackerTests.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java index bfbb8c7440e82..f9343a40478a8 100644 --- a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java +++ b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java @@ -51,14 +51,14 @@ public class LocalCheckpointTrackerTests extends ESTestCase { public static LocalCheckpointTracker createEmptyTracker() { return new LocalCheckpointTracker( - IndexSettingsModule.newIndexSettings( - "test", - Settings - .builder() - .put(LocalCheckpointTracker.SETTINGS_BIT_ARRAYS_SIZE.getKey(), SMALL_CHUNK_SIZE) - .build()), - SequenceNumbersService.NO_OPS_PERFORMED, - SequenceNumbersService.NO_OPS_PERFORMED + IndexSettingsModule.newIndexSettings( + "test", + Settings + .builder() + .put(LocalCheckpointTracker.SETTINGS_BIT_ARRAYS_SIZE.getKey(), SMALL_CHUNK_SIZE) + .build()), + SequenceNumbersService.NO_OPS_PERFORMED, + SequenceNumbersService.NO_OPS_PERFORMED ); } From 0174af4cceb5e541aca5f69d3d4e57eea1d39fdb Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 3 Jul 2017 16:47:03 -0400 Subject: [PATCH 07/16] Remove import --- .../elasticsearch/index/seqno/LocalCheckpointTrackerTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java index f9343a40478a8..e2978ffc51d52 100644 --- a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java +++ b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java @@ -41,7 +41,6 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.isOneOf; -import static org.hamcrest.Matchers.not; public class LocalCheckpointTrackerTests extends ESTestCase { From 385c94884d202c91bfba1a920e203e8c7f9df73e Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 4 Jul 2017 10:20:42 -0400 Subject: [PATCH 08/16] Add assertion --- .../org/elasticsearch/index/seqno/LocalCheckpointTracker.java | 1 + core/src/main/java/org/elasticsearch/index/shard/IndexShard.java | 1 + 2 files changed, 2 insertions(+) diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java index 867f2241416b6..9af9f00b1d120 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java @@ -127,6 +127,7 @@ public synchronized void markSeqNoAsCompleted(final long seqNo) { * @param checkpoint the local checkpoint to reset this tracker to */ synchronized void resetCheckpoint(final long checkpoint) { + assert checkpoint != SequenceNumbersService.UNASSIGNED_SEQ_NO; assert checkpoint <= this.checkpoint; processedSeqNo.clear(); firstProcessedSeqNo = checkpoint + 1; diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 81c30cb6e83f9..4f7720d44aa67 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2057,6 +2057,7 @@ public void acquireReplicaOperationPermit(final long operationPrimaryTerm, final assert operationPrimaryTerm > primaryTerm : "shard term already update. op term [" + operationPrimaryTerm + "], shardTerm [" + primaryTerm + "]"; primaryTerm = operationPrimaryTerm; + assert globalCheckpoint != SequenceNumbersService.UNASSIGNED_SEQ_NO; updateGlobalCheckpointOnReplica(globalCheckpoint); logger.trace( "detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]", From 93e751f2321cfe9bc7c27dd494516f67a90bc272 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 4 Jul 2017 10:48:16 -0400 Subject: [PATCH 09/16] Fix test --- .../java/org/elasticsearch/index/shard/IndexShardTests.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index ec2b7b26996bd..d1b0aeb405387 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -80,6 +80,7 @@ import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceToParse; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.store.Store; @@ -738,6 +739,11 @@ public void onFailure(Exception e) { public void testConcurrentTermIncreaseOnReplicaShard() throws BrokenBarrierException, InterruptedException, IOException { final IndexShard indexShard = newStartedShard(false); + /* + * When a shard recovers from a primary, it will advance its global checkpoint. We simulate that here; since we are not performing + * any operations, we initialize to no operations performed as the local checkpoint on this shard carries the same value. + */ + indexShard.updateGlobalCheckpointOnReplica(SequenceNumbersService.NO_OPS_PERFORMED); final CyclicBarrier barrier = new CyclicBarrier(3); final CountDownLatch latch = new CountDownLatch(2); From e8e4544a465f2bdf55c0968cede91ba9f0981768 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 4 Jul 2017 11:07:34 -0400 Subject: [PATCH 10/16] Fix tests --- .../elasticsearch/index/shard/IndexShardTests.java | 14 +++++++++----- .../index/shard/IndexShardTestCase.java | 8 +++++++- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index d1b0aeb405387..b09b0e6c83662 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -275,7 +275,7 @@ public void testClosesPreventsNewOperations() throws InterruptedException, Execu // expected } try { - indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm(), SequenceNumbersService.UNASSIGNED_SEQ_NO, null, + indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm(), SequenceNumbersService.NO_OPS_PERFORMED, null, ThreadPool.Names.INDEX); fail("we should not be able to increment anymore"); } catch (IndexShardClosedException e) { @@ -287,7 +287,7 @@ public void testRejectOperationPermitWithHigherTermWhenNotStarted() throws IOExc IndexShard indexShard = newShard(false); expectThrows(IndexShardNotStartedException.class, () -> indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm() + randomIntBetween(1, 100), - SequenceNumbersService.UNASSIGNED_SEQ_NO, null, ThreadPool.Names.INDEX)); + SequenceNumbersService.NO_OPS_PERFORMED, null, ThreadPool.Names.INDEX)); closeShards(indexShard); } @@ -509,6 +509,7 @@ public void testOperationPermitOnReplicaShards() throws Exception { case 0: // started replica indexShard = newStartedShard(false); + indexShard.updateGlobalCheckpointOnReplica(SequenceNumbersService.NO_OPS_PERFORMED); engineClosed = false; break; case 1: { @@ -530,6 +531,9 @@ public void testOperationPermitOnReplicaShards() throws Exception { routing = TestShardRouting.newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode", true, ShardRoutingState.RELOCATING, AllocationId.newRelocation(routing.allocationId())); IndexShardTestCase.updateRoutingEntry(indexShard, routing); + indexShard.updateLocalCheckpointForShard( + indexShard.routingEntry().allocationId().getId(), + SequenceNumbersService.NO_OPS_PERFORMED); indexShard.relocated("test", primaryContext -> {}); engineClosed = false; break; @@ -580,7 +584,7 @@ public void onFailure(Exception e) { } }; - indexShard.acquireReplicaOperationPermit(primaryTerm - 1, SequenceNumbersService.UNASSIGNED_SEQ_NO, onLockAcquired, + indexShard.acquireReplicaOperationPermit(primaryTerm - 1, SequenceNumbersService.NO_OPS_PERFORMED, onLockAcquired, ThreadPool.Names.INDEX); assertFalse(onResponse.get()); @@ -597,11 +601,11 @@ public void onFailure(Exception e) { final long newPrimaryTerm = primaryTerm + 1 + randomInt(20); if (engineClosed == false) { assertThat(indexShard.getLocalCheckpoint(), equalTo(SequenceNumbersService.NO_OPS_PERFORMED)); - assertThat(indexShard.getGlobalCheckpoint(), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); + assertThat(indexShard.getGlobalCheckpoint(), equalTo(SequenceNumbersService.NO_OPS_PERFORMED)); } final long newGlobalCheckPoint; if (engineClosed || randomBoolean()) { - newGlobalCheckPoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; + newGlobalCheckPoint = SequenceNumbersService.NO_OPS_PERFORMED; } else { long localCheckPoint = indexShard.getGlobalCheckpoint() + randomInt(100); // advance local checkpoint diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index e9e58ef5127fb..937e95f02e42d 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -359,7 +359,13 @@ protected void recoveryShardFromStore(IndexShard primary) throws IOException { } public static void updateRoutingEntry(IndexShard shard, ShardRouting shardRouting) throws IOException { - shard.updateShardState(shardRouting, shard.getPrimaryTerm(), null, 0L, Collections.emptySet(), Collections.emptySet()); + final Set activeAllocationIds; + if (shardRouting.primary()) { + activeAllocationIds = Collections.singleton(shardRouting.allocationId().getId()); + } else { + activeAllocationIds = Collections.emptySet(); + } + shard.updateShardState(shardRouting, shard.getPrimaryTerm(), null, 0L, activeAllocationIds, Collections.emptySet()); } protected void recoveryEmptyReplica(IndexShard replica) throws IOException { From 49742d4d2a26c283c84b03b637462dc196938604 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 4 Jul 2017 11:12:03 -0400 Subject: [PATCH 11/16] Revert "Fix tests" This reverts commit e8e4544a465f2bdf55c0968cede91ba9f0981768. --- .../elasticsearch/index/shard/IndexShardTests.java | 14 +++++--------- .../index/shard/IndexShardTestCase.java | 8 +------- 2 files changed, 6 insertions(+), 16 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index b09b0e6c83662..d1b0aeb405387 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -275,7 +275,7 @@ public void testClosesPreventsNewOperations() throws InterruptedException, Execu // expected } try { - indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm(), SequenceNumbersService.NO_OPS_PERFORMED, null, + indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm(), SequenceNumbersService.UNASSIGNED_SEQ_NO, null, ThreadPool.Names.INDEX); fail("we should not be able to increment anymore"); } catch (IndexShardClosedException e) { @@ -287,7 +287,7 @@ public void testRejectOperationPermitWithHigherTermWhenNotStarted() throws IOExc IndexShard indexShard = newShard(false); expectThrows(IndexShardNotStartedException.class, () -> indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm() + randomIntBetween(1, 100), - SequenceNumbersService.NO_OPS_PERFORMED, null, ThreadPool.Names.INDEX)); + SequenceNumbersService.UNASSIGNED_SEQ_NO, null, ThreadPool.Names.INDEX)); closeShards(indexShard); } @@ -509,7 +509,6 @@ public void testOperationPermitOnReplicaShards() throws Exception { case 0: // started replica indexShard = newStartedShard(false); - indexShard.updateGlobalCheckpointOnReplica(SequenceNumbersService.NO_OPS_PERFORMED); engineClosed = false; break; case 1: { @@ -531,9 +530,6 @@ public void testOperationPermitOnReplicaShards() throws Exception { routing = TestShardRouting.newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode", true, ShardRoutingState.RELOCATING, AllocationId.newRelocation(routing.allocationId())); IndexShardTestCase.updateRoutingEntry(indexShard, routing); - indexShard.updateLocalCheckpointForShard( - indexShard.routingEntry().allocationId().getId(), - SequenceNumbersService.NO_OPS_PERFORMED); indexShard.relocated("test", primaryContext -> {}); engineClosed = false; break; @@ -584,7 +580,7 @@ public void onFailure(Exception e) { } }; - indexShard.acquireReplicaOperationPermit(primaryTerm - 1, SequenceNumbersService.NO_OPS_PERFORMED, onLockAcquired, + indexShard.acquireReplicaOperationPermit(primaryTerm - 1, SequenceNumbersService.UNASSIGNED_SEQ_NO, onLockAcquired, ThreadPool.Names.INDEX); assertFalse(onResponse.get()); @@ -601,11 +597,11 @@ public void onFailure(Exception e) { final long newPrimaryTerm = primaryTerm + 1 + randomInt(20); if (engineClosed == false) { assertThat(indexShard.getLocalCheckpoint(), equalTo(SequenceNumbersService.NO_OPS_PERFORMED)); - assertThat(indexShard.getGlobalCheckpoint(), equalTo(SequenceNumbersService.NO_OPS_PERFORMED)); + assertThat(indexShard.getGlobalCheckpoint(), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); } final long newGlobalCheckPoint; if (engineClosed || randomBoolean()) { - newGlobalCheckPoint = SequenceNumbersService.NO_OPS_PERFORMED; + newGlobalCheckPoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; } else { long localCheckPoint = indexShard.getGlobalCheckpoint() + randomInt(100); // advance local checkpoint diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 937e95f02e42d..e9e58ef5127fb 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -359,13 +359,7 @@ protected void recoveryShardFromStore(IndexShard primary) throws IOException { } public static void updateRoutingEntry(IndexShard shard, ShardRouting shardRouting) throws IOException { - final Set activeAllocationIds; - if (shardRouting.primary()) { - activeAllocationIds = Collections.singleton(shardRouting.allocationId().getId()); - } else { - activeAllocationIds = Collections.emptySet(); - } - shard.updateShardState(shardRouting, shard.getPrimaryTerm(), null, 0L, activeAllocationIds, Collections.emptySet()); + shard.updateShardState(shardRouting, shard.getPrimaryTerm(), null, 0L, Collections.emptySet(), Collections.emptySet()); } protected void recoveryEmptyReplica(IndexShard replica) throws IOException { From 5064e8c2b1eddbcbdba4202e6e7990cedb6561e9 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 4 Jul 2017 11:12:09 -0400 Subject: [PATCH 12/16] Revert "Fix test" This reverts commit 93e751f2321cfe9bc7c27dd494516f67a90bc272. --- .../java/org/elasticsearch/index/shard/IndexShardTests.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index d1b0aeb405387..ec2b7b26996bd 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -80,7 +80,6 @@ import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceToParse; -import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.store.Store; @@ -739,11 +738,6 @@ public void onFailure(Exception e) { public void testConcurrentTermIncreaseOnReplicaShard() throws BrokenBarrierException, InterruptedException, IOException { final IndexShard indexShard = newStartedShard(false); - /* - * When a shard recovers from a primary, it will advance its global checkpoint. We simulate that here; since we are not performing - * any operations, we initialize to no operations performed as the local checkpoint on this shard carries the same value. - */ - indexShard.updateGlobalCheckpointOnReplica(SequenceNumbersService.NO_OPS_PERFORMED); final CyclicBarrier barrier = new CyclicBarrier(3); final CountDownLatch latch = new CountDownLatch(2); From 5640e2a90c22f2c77a4069edb0239c8f0361e66b Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 4 Jul 2017 11:12:15 -0400 Subject: [PATCH 13/16] Revert "Add assertion" This reverts commit 385c94884d202c91bfba1a920e203e8c7f9df73e. --- .../org/elasticsearch/index/seqno/LocalCheckpointTracker.java | 1 - core/src/main/java/org/elasticsearch/index/shard/IndexShard.java | 1 - 2 files changed, 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java index 9af9f00b1d120..867f2241416b6 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java @@ -127,7 +127,6 @@ public synchronized void markSeqNoAsCompleted(final long seqNo) { * @param checkpoint the local checkpoint to reset this tracker to */ synchronized void resetCheckpoint(final long checkpoint) { - assert checkpoint != SequenceNumbersService.UNASSIGNED_SEQ_NO; assert checkpoint <= this.checkpoint; processedSeqNo.clear(); firstProcessedSeqNo = checkpoint + 1; diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 4f7720d44aa67..81c30cb6e83f9 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2057,7 +2057,6 @@ public void acquireReplicaOperationPermit(final long operationPrimaryTerm, final assert operationPrimaryTerm > primaryTerm : "shard term already update. op term [" + operationPrimaryTerm + "], shardTerm [" + primaryTerm + "]"; primaryTerm = operationPrimaryTerm; - assert globalCheckpoint != SequenceNumbersService.UNASSIGNED_SEQ_NO; updateGlobalCheckpointOnReplica(globalCheckpoint); logger.trace( "detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]", From d1e0ec2751d83a5c263615498929d2ae29cb1078 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 4 Jul 2017 12:20:45 -0400 Subject: [PATCH 14/16] Special case --- .../index/seqno/LocalCheckpointTracker.java | 1 + .../org/elasticsearch/index/shard/IndexShard.java | 11 +++++++++-- .../elasticsearch/index/shard/IndexShardTests.java | 14 ++++++++++---- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java index 867f2241416b6..9af9f00b1d120 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java @@ -127,6 +127,7 @@ public synchronized void markSeqNoAsCompleted(final long seqNo) { * @param checkpoint the local checkpoint to reset this tracker to */ synchronized void resetCheckpoint(final long checkpoint) { + assert checkpoint != SequenceNumbersService.UNASSIGNED_SEQ_NO; assert checkpoint <= this.checkpoint; processedSeqNo.clear(); firstProcessedSeqNo = checkpoint + 1; diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 81c30cb6e83f9..021f37d45712a 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2058,12 +2058,19 @@ public void acquireReplicaOperationPermit(final long operationPrimaryTerm, final "shard term already update. op term [" + operationPrimaryTerm + "], shardTerm [" + primaryTerm + "]"; primaryTerm = operationPrimaryTerm; updateGlobalCheckpointOnReplica(globalCheckpoint); + final long currentGlobalCheckpoint = getGlobalCheckpoint(); + final long localCheckpoint; + if (currentGlobalCheckpoint == SequenceNumbersService.UNASSIGNED_SEQ_NO) { + localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED; + } else { + localCheckpoint = currentGlobalCheckpoint; + } logger.trace( "detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]", operationPrimaryTerm, getLocalCheckpoint(), - getGlobalCheckpoint()); - getEngine().seqNoService().resetLocalCheckpoint(getGlobalCheckpoint()); + localCheckpoint); + getEngine().seqNoService().resetLocalCheckpoint(localCheckpoint); getEngine().getTranslog().rollGeneration(); }); globalCheckpointUpdated = true; diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index ec2b7b26996bd..2b446d559aef9 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -80,6 +80,7 @@ import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceToParse; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.store.Store; @@ -699,15 +700,15 @@ public void testThrowBackLocalCheckpointOnReplica() throws IOException, Interrup final int operations = 1024 - scaledRandomIntBetween(0, 1024); indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED)); - final long globalCheckpointOnReplica = + final long globalCheckpointOnReplica = SequenceNumbersService.UNASSIGNED_SEQ_NO; randomIntBetween( - Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED), + Math.toIntExact(SequenceNumbersService.UNASSIGNED_SEQ_NO), Math.toIntExact(indexShard.getLocalCheckpoint())); indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica); final int globalCheckpoint = randomIntBetween( - Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED), + Math.toIntExact(SequenceNumbersService.UNASSIGNED_SEQ_NO), Math.toIntExact(indexShard.getLocalCheckpoint())); final CountDownLatch latch = new CountDownLatch(1); indexShard.acquireReplicaOperationPermit( @@ -727,7 +728,12 @@ public void onFailure(Exception e) { ThreadPool.Names.SAME); latch.await(); - assertThat(indexShard.getLocalCheckpoint(), equalTo(Math.max(globalCheckpoint, globalCheckpointOnReplica))); + if (globalCheckpointOnReplica == SequenceNumbersService.UNASSIGNED_SEQ_NO + && globalCheckpoint == SequenceNumbersService.UNASSIGNED_SEQ_NO) { + assertThat(indexShard.getLocalCheckpoint(), equalTo(SequenceNumbersService.NO_OPS_PERFORMED)); + } else { + assertThat(indexShard.getLocalCheckpoint(), equalTo(Math.max(globalCheckpoint, globalCheckpointOnReplica))); + } // ensure that after the local checkpoint throw back and indexing again, the local checkpoint advances final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(indexShard.getLocalCheckpoint())); From e1bc5c783dbb2aaba0e032ffd066daa424ac6456 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 4 Jul 2017 15:25:27 -0400 Subject: [PATCH 15/16] Fix test --- .../org/elasticsearch/index/shard/IndexShardTests.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 2b446d559aef9..8673fc6110087 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -610,6 +610,12 @@ public void onFailure(Exception e) { } newGlobalCheckPoint = randomIntBetween((int) indexShard.getGlobalCheckpoint(), (int) localCheckPoint); } + final long expectedLocalCheckpoint; + if (newGlobalCheckPoint == SequenceNumbersService.UNASSIGNED_SEQ_NO) { + expectedLocalCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED; + } else { + expectedLocalCheckpoint = newGlobalCheckPoint; + } // but you can not increment with a new primary term until the operations on the older primary term complete final Thread thread = new Thread(() -> { try { @@ -621,7 +627,7 @@ public void onFailure(Exception e) { @Override public void onResponse(Releasable releasable) { assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm)); - assertThat(indexShard.getLocalCheckpoint(), equalTo(newGlobalCheckPoint)); + assertThat(indexShard.getLocalCheckpoint(), equalTo(expectedLocalCheckpoint)); assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint)); onResponse.set(true); releasable.close(); @@ -682,7 +688,7 @@ private void finish() { assertTrue(onResponse.get()); assertNull(onFailure.get()); assertThat(indexShard.getTranslog().getGeneration().translogFileGeneration, equalTo(translogGen + 1)); - assertThat(indexShard.getLocalCheckpoint(), equalTo(newGlobalCheckPoint)); + assertThat(indexShard.getLocalCheckpoint(), equalTo(expectedLocalCheckpoint)); assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint)); } } From 6d67289122d5916f3ee4fb60d3139d6507558750 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 4 Jul 2017 16:15:59 -0400 Subject: [PATCH 16/16] Cleanup --- .../org/elasticsearch/index/shard/IndexShardTests.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 8673fc6110087..a838681c901e7 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -706,7 +706,7 @@ public void testThrowBackLocalCheckpointOnReplica() throws IOException, Interrup final int operations = 1024 - scaledRandomIntBetween(0, 1024); indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED)); - final long globalCheckpointOnReplica = SequenceNumbersService.UNASSIGNED_SEQ_NO; + final long globalCheckpointOnReplica = randomIntBetween( Math.toIntExact(SequenceNumbersService.UNASSIGNED_SEQ_NO), Math.toIntExact(indexShard.getLocalCheckpoint())); @@ -722,12 +722,13 @@ public void testThrowBackLocalCheckpointOnReplica() throws IOException, Interrup globalCheckpoint, new ActionListener() { @Override - public void onResponse(Releasable releasable) { + public void onResponse(final Releasable releasable) { + releasable.close(); latch.countDown(); } @Override - public void onFailure(Exception e) { + public void onFailure(final Exception e) { } },