From f3783f8d940c332021b7bac2b702362be54bb0d3 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Tue, 22 Nov 2022 03:38:37 +0000 Subject: [PATCH] Fix new added replica shards falling behind primary. Signed-off-by: Rishikesh1159 --- .../replication/SegmentReplicationIT.java | 45 ++++++++++++++ .../cluster/IndicesClusterStateService.java | 59 +++++++++++++++++-- .../SegmentReplicationTargetService.java | 4 ++ 3 files changed, 104 insertions(+), 4 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 2ceb4e0908df3..c6d09e060803c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -10,6 +10,7 @@ import com.carrotsearch.randomizedtesting.RandomizedTest; import org.junit.BeforeClass; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.indices.segments.IndexShardSegments; import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; @@ -24,6 +25,7 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand; import org.opensearch.common.Nullable; +import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.Index; @@ -53,6 +55,7 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static org.hamcrest.Matchers.equalTo; import static org.opensearch.index.query.QueryBuilders.matchQuery; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @@ -194,6 +197,48 @@ public void testCancelPrimaryAllocation() throws Exception { assertSegmentStats(REPLICA_COUNT); } + /** + * This test adds a new replica shard to an existing cluster which already has few docs inserted before adding replica. + * We don't perform any refresh on index and assert new replica shard on doc hit count. + * This test makes sure that when a new replica is added to an existing cluster it gets all latest segments from primary even without a refresh. + */ + public void testAddNewReplica() throws Exception { + logger.info("--> starting [node1] ..."); + final String node_1 = internalCluster().startNode(); + + logger.info("--> creating test index ..."); + prepareCreate(INDEX_NAME, Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 1)).get(); + + logger.info("--> index 10 docs"); + for (int i = 0; i < 10; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + logger.info("--> flush so we have an actual index"); + client().admin().indices().prepareFlush().execute().actionGet(); + logger.info("--> index more docs so we have something in the translog"); + for (int i = 10; i < 20; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + + logger.info("--> verifying count"); + client().admin().indices().prepareRefresh().execute().actionGet(); + assertThat(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L)); + + logger.info("--> start another node"); + final String node_2 = internalCluster().startNode(); + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes("2") + .setWaitForGreenStatus() + .execute() + .actionGet(); + assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + waitForReplicaUpdate(); + assertThat(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L)); + } + public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { final String nodeA = internalCluster().startNode(); final String nodeB = internalCluster().startNode(); diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index 15a9bf9e4c492..f09700e08cfb2 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -45,11 +45,12 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.IndexShardRoutingTable; -import org.opensearch.cluster.routing.RecoverySource.Type; import org.opensearch.cluster.routing.RoutingNode; -import org.opensearch.cluster.routing.RoutingTable; -import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.RecoverySource.Type; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.component.AbstractLifecycleComponent; @@ -82,8 +83,10 @@ import org.opensearch.indices.recovery.RecoveryListener; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.SegmentReplicationSourceService; +import org.opensearch.indices.replication.SegmentReplicationState; import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; +import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.repositories.RepositoriesService; import org.opensearch.search.SearchService; @@ -143,6 +146,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private final Consumer globalCheckpointSyncer; private final RetentionLeaseSyncer retentionLeaseSyncer; + private final SegmentReplicationTargetService segmentReplicationTargetService; + private final SegmentReplicationCheckpointPublisher checkpointPublisher; @Inject @@ -217,6 +222,7 @@ public IndicesClusterStateService( indexEventListeners.add(segmentReplicationTargetService); indexEventListeners.add(segmentReplicationSourceService); } + this.segmentReplicationTargetService = segmentReplicationTargetService; this.builtInIndexListener = Collections.unmodifiableList(indexEventListeners); this.indicesService = indicesService; this.clusterService = clusterService; @@ -774,7 +780,52 @@ public synchronized void handleRecoveryFailure(ShardRouting shardRouting, boolea public void handleRecoveryDone(ReplicationState state, ShardRouting shardRouting, long primaryTerm) { RecoveryState RecState = (RecoveryState) state; - shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + RecState.getRecoverySource(), SHARD_STATE_ACTION_LISTENER); + AllocatedIndex indexService = indicesService.indexService(shardRouting.shardId().getIndex()); + IndexShard indexShard = (IndexShard) indexService.getShardOrNull(shardRouting.id()); + // For Segment Replication enabled indices, we want replica shards to start a replication event to fetch latest segments before it + // is marked as Started. + if (indexShard.indexSettings().isSegRepEnabled() + && shardRouting.primary() == false + && ShardRoutingState.RELOCATING != shardRouting.state()) { + segmentReplicationTargetService.startReplication(indexShard, new SegmentReplicationTargetService.SegmentReplicationListener() { + @Override + public void onReplicationDone(SegmentReplicationState state) { + logger.trace( + () -> new ParameterizedMessage( + "[shardId {}] [replication id {}] Replication complete, timing data: {}", + indexShard.shardId().getId(), + state.getReplicationId(), + state.getTimingData() + ) + ); + shardStateAction.shardStarted( + shardRouting, + primaryTerm, + "after " + RecState.getRecoverySource(), + SHARD_STATE_ACTION_LISTENER + ); + } + + @Override + public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { + logger.trace( + () -> new ParameterizedMessage( + "[shardId {}] [replication id {}] Replication failed, timing data: {}", + indexShard.shardId().getId(), + state.getReplicationId(), + state.getTimingData() + ) + ); + if (sendShardFailure == true) { + logger.error("replication failure", e); + indexShard.failShard("replication failure", e); + } + } + }); + } else { + shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + RecState.getRecoverySource(), SHARD_STATE_ACTION_LISTENER); + } + } private void failAndRemoveShard( diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index b633f0fa3b9a0..64765484f2bfe 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -230,6 +230,10 @@ public SegmentReplicationTarget startReplication( return target; } + public SegmentReplicationTarget startReplication(final IndexShard indexShard, final SegmentReplicationListener listener) { + return startReplication(ReplicationCheckpoint.empty(indexShard.shardId()), indexShard, listener); + } + // pkg-private for integration tests void startReplication(final SegmentReplicationTarget target) { final long replicationId = onGoingReplications.start(target, recoverySettings.activityTimeout());