From e0dcca6084446b0a2d6ecea848160d419999acd8 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Mon, 8 Aug 2022 06:12:41 +0000 Subject: [PATCH 1/8] Adding PrimaryMode check before publishing checkpoint. Signed-off-by: Rishikesh1159 --- .../shard/CheckpointRefreshListener.java | 2 +- .../index/shard/IndexShardTests.java | 47 +++++++++++++++++-- 2 files changed, 44 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java index ac6754bf6a74a..96d74bea85920 100644 --- a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java @@ -40,7 +40,7 @@ public void beforeRefresh() throws IOException { @Override public void afterRefresh(boolean didRefresh) throws IOException { - if (didRefresh) { + if (didRefresh && shard.getReplicationTracker().isPrimaryMode()) { publisher.publish(shard); } } diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 57c2289c848ef..a0e84e2b4085a 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -206,7 +206,10 @@ import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.oneOf; import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; import static org.opensearch.common.lucene.Lucene.cleanLuceneIndex; import static org.opensearch.common.xcontent.ToXContent.EMPTY_PARAMS; @@ -728,9 +731,9 @@ public void testOperationPermitsOnPrimaryShards() throws Exception { final long newPrimaryTerm = indexShard.getPendingPrimaryTerm() + between(1, 1000); CountDownLatch latch = new CountDownLatch(1); indexShard.updateShardState(primaryRouting, newPrimaryTerm, (shard, listener) -> { - assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(newPrimaryTerm)); - latch.countDown(); - }, + assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(newPrimaryTerm)); + latch.countDown(); + }, 0L, Collections.singleton(indexShard.routingEntry().allocationId().getId()), new IndexShardRoutingTable.Builder(indexShard.shardId()).addShard(primaryRouting).build() @@ -3522,7 +3525,43 @@ public void testCheckpointRefreshListenerWithNull() throws IOException { } /** - * creates a new initializing shard. The shard will will be put in its proper path under the + * here we are starting a new primary shard in PrimaryMode and testing if the shard publishes checkpoint after refresh. + */ + public void testPublishCheckpointOnPrimaryMode() throws IOException { + final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class); + IndexShard shard = newStartedShard(p -> newShard(mock), true); + CheckpointRefreshListener refreshListener = new CheckpointRefreshListener(shard, mock); + refreshListener.afterRefresh(true); + + // verify checkpoint is published + verify(mock, times(1)).publish(any()); + closeShards(shard); + } + + /** + * here we are starting a new primary shard in PrimaryMode intially and starting relocation handoff. Later we complete relocation handoff then shard is no longer + * in PrimaryMode, and we test if the shard does not publish checkpoint after refresh. + */ + public void testPublishCheckpointAfterRelocationHandOff() throws IOException { + final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class); + IndexShard shard = newStartedShard(p -> newShard(mock), true); + CheckpointRefreshListener refreshListener = new CheckpointRefreshListener(shard, mock); + String id = shard.routingEntry().allocationId().getId(); + + // Starting relocation handoff + shard.getReplicationTracker().startRelocationHandoff(id); + + // Completing relocation handoff + shard.getReplicationTracker().completeRelocationHandoff(); + refreshListener.afterRefresh(true); + + // verify checkpoint is not published + verify(mock, times(0)).publish(any()); + closeShards(shard); + } + + /** + * creates a new initializing shard. The shard will be put in its proper path under the * current node id the shard is assigned to. * @param checkpointPublisher Segment Replication Checkpoint Publisher to publish checkpoint */ From c048700b5fa28a8ff21a3aa77c19aa3f331c624a Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Mon, 8 Aug 2022 06:26:54 +0000 Subject: [PATCH 2/8] Applying spotless check Signed-off-by: Rishikesh1159 --- .../java/org/opensearch/index/shard/IndexShardTests.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index a0e84e2b4085a..65cecc2937384 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -731,9 +731,9 @@ public void testOperationPermitsOnPrimaryShards() throws Exception { final long newPrimaryTerm = indexShard.getPendingPrimaryTerm() + between(1, 1000); CountDownLatch latch = new CountDownLatch(1); indexShard.updateShardState(primaryRouting, newPrimaryTerm, (shard, listener) -> { - assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(newPrimaryTerm)); - latch.countDown(); - }, + assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(newPrimaryTerm)); + latch.countDown(); + }, 0L, Collections.singleton(indexShard.routingEntry().allocationId().getId()), new IndexShardRoutingTable.Builder(indexShard.shardId()).addShard(primaryRouting).build() From 31db9c7a4be3d27b23b03002059c1ae34d9f8c1f Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Mon, 8 Aug 2022 16:52:18 +0000 Subject: [PATCH 3/8] Moving segrep specific tests to SegmentReplicationIndexShardTests. Signed-off-by: Rishikesh1159 --- .../index/shard/IndexShardTests.java | 39 ---------------- .../SegmentReplicationIndexShardTests.java | 44 +++++++++++++++++++ 2 files changed, 44 insertions(+), 39 deletions(-) diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 65cecc2937384..8c00ab97a46ea 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -206,10 +206,7 @@ import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.oneOf; import static org.hamcrest.Matchers.sameInstance; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; import static org.opensearch.common.lucene.Lucene.cleanLuceneIndex; import static org.opensearch.common.xcontent.ToXContent.EMPTY_PARAMS; @@ -3524,42 +3521,6 @@ public void testCheckpointRefreshListenerWithNull() throws IOException { closeShards(shard); } - /** - * here we are starting a new primary shard in PrimaryMode and testing if the shard publishes checkpoint after refresh. - */ - public void testPublishCheckpointOnPrimaryMode() throws IOException { - final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class); - IndexShard shard = newStartedShard(p -> newShard(mock), true); - CheckpointRefreshListener refreshListener = new CheckpointRefreshListener(shard, mock); - refreshListener.afterRefresh(true); - - // verify checkpoint is published - verify(mock, times(1)).publish(any()); - closeShards(shard); - } - - /** - * here we are starting a new primary shard in PrimaryMode intially and starting relocation handoff. Later we complete relocation handoff then shard is no longer - * in PrimaryMode, and we test if the shard does not publish checkpoint after refresh. - */ - public void testPublishCheckpointAfterRelocationHandOff() throws IOException { - final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class); - IndexShard shard = newStartedShard(p -> newShard(mock), true); - CheckpointRefreshListener refreshListener = new CheckpointRefreshListener(shard, mock); - String id = shard.routingEntry().allocationId().getId(); - - // Starting relocation handoff - shard.getReplicationTracker().startRelocationHandoff(id); - - // Completing relocation handoff - shard.getReplicationTracker().completeRelocationHandoff(); - refreshListener.afterRefresh(true); - - // verify checkpoint is not published - verify(mock, times(0)).publish(any()); - closeShards(shard); - } - /** * creates a new initializing shard. The shard will be put in its proper path under the * current node id the shard is assigned to. diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 3fcf6116b11a2..c773fe8a7bc06 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -14,8 +14,16 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.common.ReplicationType; +import java.io.IOException; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; + public class SegmentReplicationIndexShardTests extends OpenSearchIndexLevelReplicationTestCase { private static final Settings settings = Settings.builder() @@ -56,4 +64,40 @@ public void testIgnoreShardIdle() throws Exception { replica.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b)); } } + + /** + * here we are starting a new primary shard in PrimaryMode and testing if the shard publishes checkpoint after refresh. + */ + public void testPublishCheckpointOnPrimaryMode() throws IOException { + final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class); + IndexShard shard = newStartedShard(true); + CheckpointRefreshListener refreshListener = new CheckpointRefreshListener(shard, mock); + refreshListener.afterRefresh(true); + + // verify checkpoint is published + verify(mock, times(1)).publish(any()); + closeShards(shard); + } + + /** + * here we are starting a new primary shard in PrimaryMode initially and starting relocation handoff. Later we complete relocation handoff then shard is no longer + * in PrimaryMode, and we test if the shard does not publish checkpoint after refresh. + */ + public void testPublishCheckpointAfterRelocationHandOff() throws IOException { + final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class); + IndexShard shard = newStartedShard(true); + CheckpointRefreshListener refreshListener = new CheckpointRefreshListener(shard, mock); + String id = shard.routingEntry().allocationId().getId(); + + // Starting relocation handoff + shard.getReplicationTracker().startRelocationHandoff(id); + + // Completing relocation handoff + shard.getReplicationTracker().completeRelocationHandoff(); + refreshListener.afterRefresh(true); + + // verify checkpoint is not published + verify(mock, times(0)).publish(any()); + closeShards(shard); + } } From a7dfbb8d182c43834a352a75d5f4b3e6c0912925 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Wed, 10 Aug 2022 05:09:18 +0000 Subject: [PATCH 4/8] Adding logic and tests for rejecting checkpoints if shard is in PrimaryMode. Signed-off-by: Rishikesh1159 --- .../org/opensearch/index/shard/IndexShard.java | 8 ++++++++ .../SegmentReplicationTargetServiceTests.java | 17 +++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index b05eec4304cd5..c90129b4d1cca 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1431,6 +1431,14 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp logger.trace(() -> new ParameterizedMessage("Ignoring new replication checkpoint - shard is not started {}", state())); return false; } + if (getReplicationTracker().isPrimaryMode()) { + logger.trace( + () -> new ParameterizedMessage( + "Ignoring new replication checkpoint - shard is in primaryMode and cannot receive any checkpoints." + ) + ); + return false; + } ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint(); if (localCheckpoint.isAheadOf(requestCheckpoint)) { logger.trace( diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 8b4bda7de50ad..3c3a09ce4df12 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -204,6 +204,23 @@ public void testNewCheckpoint_validationPassesAndReplicationFails() throws IOExc closeShard(indexShard, false); } + /** + * here we are starting a new shard in PrimaryMode and testing that we don't process a checkpoint on shard when it is in PrimaryMode. + */ + public void testRejectCheckpointOnShardPrimaryMode() throws IOException { + SegmentReplicationTargetService spy = spy(sut); + + // Starting a new shard in PrimaryMode. + IndexShard primaryShard = newStartedShard(true); + IndexShard spyShard = spy(primaryShard); + doNothing().when(spy).startReplication(any(), any(), any()); + spy.onNewCheckpoint(aheadCheckpoint, spyShard); + + // Verify that checkpoint is not processed as shard is in PrimaryMode. + verify(spy, times(0)).startReplication(any(), any(), any()); + closeShards(primaryShard); + } + public void testReplicationOnDone() throws IOException { SegmentReplicationTargetService spy = spy(sut); IndexShard spyShard = spy(indexShard); From 9f2be069ce99c5181f4efdc5651865ee6313d73d Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Wed, 10 Aug 2022 05:52:27 +0000 Subject: [PATCH 5/8] Applying ./gradlew :server:spotlessApply. Signed-off-by: Rishikesh1159 --- .../index/shard/SegmentReplicationIndexShardTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index c773fe8a7bc06..dd83bc50be9f4 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -100,4 +100,5 @@ public void testPublishCheckpointAfterRelocationHandOff() throws IOException { verify(mock, times(0)).publish(any()); closeShards(shard); } + } From 226f1c03729b52986a4056e799efa21d66a66b1c Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Wed, 10 Aug 2022 05:58:01 +0000 Subject: [PATCH 6/8] Applying ./gradlew :server:spotlessApply Signed-off-by: Rishikesh1159 --- .../index/shard/SegmentReplicationIndexShardTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 1aadcf8ff7d95..d10f8ced963b7 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -25,7 +25,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; - public class SegmentReplicationIndexShardTests extends OpenSearchIndexLevelReplicationTestCase { private static final Settings settings = Settings.builder() From 66fbfcb91d16e8b66d419ad1317f31b7a70cf183 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Wed, 10 Aug 2022 14:10:25 +0000 Subject: [PATCH 7/8] Changing log level to warn in shouldProcessCheckpoint() of IndexShard.java class. Signed-off-by: Rishikesh1159 --- server/src/main/java/org/opensearch/index/shard/IndexShard.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 33d854f4f1cb3..c69bdacd1b6f7 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1435,7 +1435,7 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp return false; } if (getReplicationTracker().isPrimaryMode()) { - logger.trace( + logger.warn( () -> new ParameterizedMessage( "Ignoring new replication checkpoint - shard is in primaryMode and cannot receive any checkpoints." ) From 4989d2b4be74a53f29b8188071f09d68878dc8e0 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Mon, 15 Aug 2022 16:42:20 +0000 Subject: [PATCH 8/8] Removing unnecessary lazy logging in shouldProcessCheckpoint(). Signed-off-by: Rishikesh1159 --- .../main/java/org/opensearch/index/shard/IndexShard.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index c69bdacd1b6f7..ac259f9430a3d 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1435,11 +1435,7 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp return false; } if (getReplicationTracker().isPrimaryMode()) { - logger.warn( - () -> new ParameterizedMessage( - "Ignoring new replication checkpoint - shard is in primaryMode and cannot receive any checkpoints." - ) - ); + logger.warn("Ignoring new replication checkpoint - shard is in primaryMode and cannot receive any checkpoints."); return false; } ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint();