diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 43bbb81b7a046..8a880ddf2b33b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -56,7 +56,6 @@ import java.util.function.Function; import java.util.stream.Collectors; -import static org.hamcrest.Matchers.equalTo; import static org.opensearch.index.query.QueryBuilders.matchQuery; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @@ -220,7 +219,57 @@ public void testCancelPrimaryAllocation() throws Exception { } /** - * This test verfies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery. + * This test verifies that adding a new node which results in peer recovery as replica; also bring replica's + * replication checkpoint upto the primary's by performing a round of segment replication. + */ + public void testNewlyAddedReplicaIsUpdated() { + internalCluster().startNode(featureFlagSettings()); + prepareCreate( + INDEX_NAME, + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ).get(); + for (int i = 0; i < 10; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + logger.info("--> flush so we have some segment files on disk"); + flush(INDEX_NAME); + logger.info("--> index more docs so we have something in the translog"); + for (int i = 10; i < 20; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + refresh(INDEX_NAME); + assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 20L); + + logger.info("--> start empty node to add replica shard"); + final String replicaNode = internalCluster().startNode(featureFlagSettings()); + ensureGreen(INDEX_NAME); + // Update replica count settings to 1 so that peer recovery triggers and recover replicaNode + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) + ); + + // Verify that cluster state is not green and replica shard failed during a round of segment replication is not added to the cluster + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes("2") + .setWaitForGreenStatus() + .setTimeout(TimeValue.timeValueSeconds(2)) + .execute() + .actionGet(); + assertFalse(clusterHealthResponse.isTimedOut()); + ensureYellow(INDEX_NAME); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, replicaNode); + assertTrue(indicesService.hasIndex(resolveIndex(INDEX_NAME))); + assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 20L); + } + + /** + * This test verifies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery. * * TODO: Ignoring this test as its flaky and needs separate fix */ @@ -247,7 +296,7 @@ public void testAddNewReplicaFailure() throws Exception { } refresh(INDEX_NAME); logger.info("--> verifying count"); - assertThat(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L)); + assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 20L); logger.info("--> start empty node to add replica shard"); final String replicaNode = internalCluster().startNode(featureFlagSettings()); diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index e8adcbdc1c89a..966e2168e263c 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -37,7 +37,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.ResourceAlreadyExistsException; import org.opensearch.action.ActionListener; -import org.opensearch.action.StepListener; import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateApplier; @@ -83,11 +82,8 @@ import org.opensearch.indices.recovery.RecoveryListener; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.SegmentReplicationSourceService; -import org.opensearch.indices.replication.SegmentReplicationState; import org.opensearch.indices.replication.SegmentReplicationTargetService; -import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; -import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.repositories.RepositoriesService; import org.opensearch.search.SearchService; @@ -781,78 +777,7 @@ public synchronized void handleRecoveryFailure(ShardRouting shardRouting, boolea public void handleRecoveryDone(ReplicationState state, ShardRouting shardRouting, long primaryTerm) { RecoveryState recoveryState = (RecoveryState) state; - AllocatedIndex indexService = indicesService.indexService(shardRouting.shardId().getIndex()); - StepListener forceSegRepListener = new StepListener<>(); - // For Segment Replication enabled indices, we want replica shards to start a replication event to fetch latest segments before - // it is marked as Started. - if (indexService.getIndexSettings().isSegRepEnabled()) { - forceSegmentReplication(indexService, shardRouting, forceSegRepListener); - } else { - forceSegRepListener.onResponse(null); - } - forceSegRepListener.whenComplete( - v -> shardStateAction.shardStarted( - shardRouting, - primaryTerm, - "after " + recoveryState.getRecoverySource(), - SHARD_STATE_ACTION_LISTENER - ), - e -> handleRecoveryFailure(shardRouting, true, e) - ); - } - - /** - * Forces a round of Segment Replication with empty checkpoint, so that replicas could fetch latest segment files from primary. - */ - private void forceSegmentReplication( - AllocatedIndex indexService, - ShardRouting shardRouting, - StepListener forceSegRepListener - ) { - IndexShard indexShard = (IndexShard) indexService.getShardOrNull(shardRouting.id()); - if (indexShard != null && indexShard.isSegmentReplicationAllowed()) { - segmentReplicationTargetService.startReplication( - ReplicationCheckpoint.empty(shardRouting.shardId()), - indexShard, - new SegmentReplicationTargetService.SegmentReplicationListener() { - @Override - public void onReplicationDone(SegmentReplicationState state) { - logger.trace( - () -> new ParameterizedMessage( - "[shardId {}] [replication id {}] Replication complete, timing data: {}", - indexShard.shardId().getId(), - state.getReplicationId(), - state.getTimingData() - ) - ); - forceSegRepListener.onResponse(null); - } - - @Override - public void onReplicationFailure( - SegmentReplicationState state, - ReplicationFailedException e, - boolean sendShardFailure - ) { - logger.trace( - () -> new ParameterizedMessage( - "[shardId {}] [replication id {}] Replication failed, timing data: {}", - indexShard.shardId().getId(), - state.getReplicationId(), - state.getTimingData() - ) - ); - if (sendShardFailure == true) { - logger.error("replication failure", e); - indexShard.failShard("replication failure", e); - } - forceSegRepListener.onFailure(e); - } - } - ); - } else { - forceSegRepListener.onResponse(null); - } + shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + recoveryState.getRecoverySource(), SHARD_STATE_ACTION_LISTENER); } private void failAndRemoveShard( diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 276821dfb09b4..75691632f2252 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -839,7 +839,12 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis * target are failed (see {@link IndexShard#updateRoutingEntry}). */ } else { - handoffListener.onResponse(null); + // Force round of segment replication to update its checkpoint to primary's + if (shard.indexSettings().isSegRepEnabled()) { + recoveryTarget.forceSegmentFileSync(handoffListener); + } else { + handoffListener.onResponse(null); + } } handoffListener.whenComplete(res -> { stopWatch.stop(); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 85a34878af03f..c8ebae25bd4cb 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -339,7 +339,7 @@ public void messageReceived(final FileChunkRequest request, TransportChannel cha } } - class ForceSyncTransportRequestHandler implements TransportRequestHandler { + private class ForceSyncTransportRequestHandler implements TransportRequestHandler { @Override public void messageReceived(final ForceSyncRequest request, TransportChannel channel, Task task) throws Exception { assert indicesService != null; @@ -359,7 +359,10 @@ public void onReplicationDone(SegmentReplicationState state) { ) ); try { - indexShard.resetToWriteableEngine(); + // Promote engine type for primary target + if (indexShard.recoveryState().getPrimary() == true) { + indexShard.resetToWriteableEngine(); + } channel.sendResponse(TransportResponse.Empty.INSTANCE); } catch (InterruptedException | TimeoutException | IOException e) { throw new RuntimeException(e); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 44771faf36871..3c0c5f1a918c0 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -582,6 +582,7 @@ public void testNRTReplicaPromotedAsPrimary() throws Exception { // 1. Create ops that are in the index and xlog of both shards but not yet part of a commit point. final int numDocs = shards.indexDocs(randomInt(10)); + logger.info("--> numDocs {}", numDocs); // refresh and copy the segments over. oldPrimary.refresh("Test"); @@ -597,6 +598,7 @@ public void testNRTReplicaPromotedAsPrimary() throws Exception { // index some more into both but don't replicate. replica will have only numDocs searchable, but should have totalDocs // persisted. final int additonalDocs = shards.indexDocs(randomInt(10)); + logger.info("--> additonalDocs {}", additonalDocs); final int totalDocs = numDocs + additonalDocs; assertDocCounts(oldPrimary, totalDocs, totalDocs); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java index cde5cd980a91d..b5d8b2baf40dc 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java @@ -20,6 +20,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.util.CancellableThreads; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.index.store.StoreFileMetadata; @@ -51,7 +52,7 @@ public void setUp() throws Exception { super.setUp(); final Settings settings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, "SEGMENT").put(Settings.EMPTY).build(); primary = newStartedShard(true, settings); - replica = newShard(primary.shardId(), false); + replica = newShard(false, settings, new NRTReplicationEngineFactory()); recoverReplica(replica, primary, true); replicaDiscoveryNode = replica.recoveryState().getTargetNode(); } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 25625fbf68a64..de6999113a00c 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -13,7 +13,6 @@ import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.CancellableThreads; import org.opensearch.index.engine.NRTReplicationEngineFactory; @@ -62,10 +61,18 @@ public void setUp() throws Exception { .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()) .build(); - final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); primaryShard = newStartedShard(true, settings); replicaShard = newShard(false, settings, new NRTReplicationEngineFactory()); - recoverReplica(replicaShard, primaryShard, true); + recoverReplica(replicaShard, primaryShard, true, (shardList, listener) -> { + try { + assert shardList.size() >= 2; + final IndexShard primary = shardList.get(0); + return replicateSegments(primary, shardList.subList(1, shardList.size()), listener); + } catch (IOException | InterruptedException e) { + listener.onFailure(e); + throw new RuntimeException(e); + } + }); checkpoint = new ReplicationCheckpoint(replicaShard.shardId(), 0L, 0L, 0L, 0L); SegmentReplicationSourceFactory replicationSourceFactory = mock(SegmentReplicationSourceFactory.class); replicationSource = mock(SegmentReplicationSource.class); diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index ad19473380063..8df57ccad85cc 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -545,7 +545,7 @@ public void recoverReplica( markAsRecovering, inSyncIds, routingTable, - (a, b) -> null + getReplicationFunc(replica) ); OpenSearchIndexLevelReplicationTestCase.this.startReplicaAfterRecovery(replica, primary, inSyncIds, routingTable); computeReplicationTargets(); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 3ae79a8a17879..a2ee5a84dbf85 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -838,7 +838,7 @@ protected DiscoveryNode getFakeDiscoNode(String id) { } protected void recoverReplica(IndexShard replica, IndexShard primary, boolean startReplica) throws IOException { - recoverReplica(replica, primary, startReplica, (a, b) -> null); + recoverReplica(replica, primary, startReplica, getReplicationFunc(replica)); } /** recovers a replica from the given primary **/ @@ -868,6 +868,19 @@ protected void recoverReplica( recoverReplica(replica, primary, targetSupplier, markAsRecovering, markAsStarted, (a, b) -> null); } + public BiFunction, ActionListener, List> getReplicationFunc(final IndexShard target) { + return target.indexSettings().isSegRepEnabled() ? (shardList, listener) -> { + try { + assert shardList.size() >= 2; + final IndexShard primary = shardList.get(0); + return replicateSegments(primary, shardList.subList(1, shardList.size()), listener); + } catch (IOException | InterruptedException e) { + listener.onFailure(e); + throw new RuntimeException(e); + } + } : (a, b) -> null; + } + /** recovers a replica from the given primary **/ protected void recoverReplica( final IndexShard replica,