Skip to content

Commit

Permalink
Fix new added replica shards falling behind primary.
Browse files Browse the repository at this point in the history
Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
  • Loading branch information
Rishikesh1159 committed Nov 22, 2022
1 parent 7dc137f commit f3783f8
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.junit.BeforeClass;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.indices.segments.IndexShardSegments;
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest;
Expand All @@ -24,6 +25,7 @@
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand;
import org.opensearch.common.Nullable;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.Index;
Expand Down Expand Up @@ -53,6 +55,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.equalTo;
import static org.opensearch.index.query.QueryBuilders.matchQuery;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -194,6 +197,48 @@ public void testCancelPrimaryAllocation() throws Exception {
assertSegmentStats(REPLICA_COUNT);
}

/**
* This test adds a new replica shard to an existing cluster which already has few docs inserted before adding replica.
* We don't perform any refresh on index and assert new replica shard on doc hit count.
* This test makes sure that when a new replica is added to an existing cluster it gets all latest segments from primary even without a refresh.
*/
public void testAddNewReplica() throws Exception {
logger.info("--> starting [node1] ...");
final String node_1 = internalCluster().startNode();

logger.info("--> creating test index ...");
prepareCreate(INDEX_NAME, Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 1)).get();

logger.info("--> index 10 docs");
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
logger.info("--> flush so we have an actual index");
client().admin().indices().prepareFlush().execute().actionGet();
logger.info("--> index more docs so we have something in the translog");
for (int i = 10; i < 20; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}

logger.info("--> verifying count");
client().admin().indices().prepareRefresh().execute().actionGet();
assertThat(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L));

logger.info("--> start another node");
final String node_2 = internalCluster().startNode();
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("2")
.setWaitForGreenStatus()
.execute()
.actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
waitForReplicaUpdate();
assertThat(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L));
}

public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RecoverySource.Type;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.RecoverySource.Type;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.component.AbstractLifecycleComponent;
Expand Down Expand Up @@ -82,8 +83,10 @@
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.SegmentReplicationSourceService;
import org.opensearch.indices.replication.SegmentReplicationState;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.search.SearchService;
Expand Down Expand Up @@ -143,6 +146,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
private final Consumer<ShardId> globalCheckpointSyncer;
private final RetentionLeaseSyncer retentionLeaseSyncer;

private final SegmentReplicationTargetService segmentReplicationTargetService;

private final SegmentReplicationCheckpointPublisher checkpointPublisher;

@Inject
Expand Down Expand Up @@ -217,6 +222,7 @@ public IndicesClusterStateService(
indexEventListeners.add(segmentReplicationTargetService);
indexEventListeners.add(segmentReplicationSourceService);
}
this.segmentReplicationTargetService = segmentReplicationTargetService;
this.builtInIndexListener = Collections.unmodifiableList(indexEventListeners);
this.indicesService = indicesService;
this.clusterService = clusterService;
Expand Down Expand Up @@ -774,7 +780,52 @@ public synchronized void handleRecoveryFailure(ShardRouting shardRouting, boolea

public void handleRecoveryDone(ReplicationState state, ShardRouting shardRouting, long primaryTerm) {
RecoveryState RecState = (RecoveryState) state;
shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + RecState.getRecoverySource(), SHARD_STATE_ACTION_LISTENER);
AllocatedIndex<? extends Shard> indexService = indicesService.indexService(shardRouting.shardId().getIndex());
IndexShard indexShard = (IndexShard) indexService.getShardOrNull(shardRouting.id());
// For Segment Replication enabled indices, we want replica shards to start a replication event to fetch latest segments before it
// is marked as Started.
if (indexShard.indexSettings().isSegRepEnabled()
&& shardRouting.primary() == false
&& ShardRoutingState.RELOCATING != shardRouting.state()) {
segmentReplicationTargetService.startReplication(indexShard, new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication complete, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
state.getTimingData()
)
);
shardStateAction.shardStarted(
shardRouting,
primaryTerm,
"after " + RecState.getRecoverySource(),
SHARD_STATE_ACTION_LISTENER
);
}

@Override
public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
state.getTimingData()
)
);
if (sendShardFailure == true) {
logger.error("replication failure", e);
indexShard.failShard("replication failure", e);
}
}
});
} else {
shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + RecState.getRecoverySource(), SHARD_STATE_ACTION_LISTENER);
}

}

private void failAndRemoveShard(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ public SegmentReplicationTarget startReplication(
return target;
}

public SegmentReplicationTarget startReplication(final IndexShard indexShard, final SegmentReplicationListener listener) {
return startReplication(ReplicationCheckpoint.empty(indexShard.shardId()), indexShard, listener);
}

// pkg-private for integration tests
void startReplication(final SegmentReplicationTarget target) {
final long replicationId = onGoingReplications.start(target, recoverySettings.activityTimeout());
Expand Down

0 comments on commit f3783f8

Please sign in to comment.