diff --git a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java index ac6754bf6a74a..96d74bea85920 100644 --- a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java @@ -40,7 +40,7 @@ public void beforeRefresh() throws IOException { @Override public void afterRefresh(boolean didRefresh) throws IOException { - if (didRefresh) { + if (didRefresh && shard.getReplicationTracker().isPrimaryMode()) { publisher.publish(shard); } } diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 57c2289c848ef..a0e84e2b4085a 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -206,7 +206,10 @@ import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.oneOf; import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; import static org.opensearch.common.lucene.Lucene.cleanLuceneIndex; import static org.opensearch.common.xcontent.ToXContent.EMPTY_PARAMS; @@ -728,9 +731,9 @@ public void testOperationPermitsOnPrimaryShards() throws Exception { final long newPrimaryTerm = indexShard.getPendingPrimaryTerm() + between(1, 1000); CountDownLatch latch = new CountDownLatch(1); indexShard.updateShardState(primaryRouting, newPrimaryTerm, (shard, listener) -> { - assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(newPrimaryTerm)); - latch.countDown(); - }, + assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(newPrimaryTerm)); + latch.countDown(); + }, 0L, Collections.singleton(indexShard.routingEntry().allocationId().getId()), new IndexShardRoutingTable.Builder(indexShard.shardId()).addShard(primaryRouting).build() @@ -3522,7 +3525,43 @@ public void testCheckpointRefreshListenerWithNull() throws IOException { } /** - * creates a new initializing shard. The shard will will be put in its proper path under the + * here we are starting a new primary shard in PrimaryMode and testing if the shard publishes checkpoint after refresh. + */ + public void testPublishCheckpointOnPrimaryMode() throws IOException { + final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class); + IndexShard shard = newStartedShard(p -> newShard(mock), true); + CheckpointRefreshListener refreshListener = new CheckpointRefreshListener(shard, mock); + refreshListener.afterRefresh(true); + + // verify checkpoint is published + verify(mock, times(1)).publish(any()); + closeShards(shard); + } + + /** + * here we are starting a new primary shard in PrimaryMode intially and starting relocation handoff. Later we complete relocation handoff then shard is no longer + * in PrimaryMode, and we test if the shard does not publish checkpoint after refresh. + */ + public void testPublishCheckpointAfterRelocationHandOff() throws IOException { + final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class); + IndexShard shard = newStartedShard(p -> newShard(mock), true); + CheckpointRefreshListener refreshListener = new CheckpointRefreshListener(shard, mock); + String id = shard.routingEntry().allocationId().getId(); + + // Starting relocation handoff + shard.getReplicationTracker().startRelocationHandoff(id); + + // Completing relocation handoff + shard.getReplicationTracker().completeRelocationHandoff(); + refreshListener.afterRefresh(true); + + // verify checkpoint is not published + verify(mock, times(0)).publish(any()); + closeShards(shard); + } + + /** + * creates a new initializing shard. The shard will be put in its proper path under the * current node id the shard is assigned to. * @param checkpointPublisher Segment Replication Checkpoint Publisher to publish checkpoint */