diff --git a/CHANGELOG.md b/CHANGELOG.md index 8ca3deb7c441f..6fb4251584939 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add runAs to Subject interface and introduce IdentityAwarePlugin extension point ([#14630](https://github.com/opensearch-project/OpenSearch/pull/14630)) - [Workload Management] Add rejection logic for co-ordinator and shard level requests ([#15428](https://github.com/opensearch-project/OpenSearch/pull/15428))) - Adding translog durability validation in index templates ([#15494](https://github.com/opensearch-project/OpenSearch/pull/15494)) +- [Reader Writer Separation] Add searchOnly replica routing configuration ([#15410](https://github.com/opensearch-project/OpenSearch/pull/15410)) ### Dependencies - Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081)) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaFeatureFlagIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaFeatureFlagIT.java new file mode 100644 index 0000000000000..374743a8e4a17 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaFeatureFlagIT.java @@ -0,0 +1,55 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.settings; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.OpenSearchIntegTestCase; + +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 1) +public class SearchOnlyReplicaFeatureFlagIT extends OpenSearchIntegTestCase { + + private static final String TEST_INDEX = "test_index"; + + @Override + protected Settings featureFlagSettings() { + return Settings.builder() + .put(super.featureFlagSettings()) + .put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.FALSE) + .build(); + } + + public void testCreateFeatureFlagDisabled() { + Settings settings = Settings.builder().put(indexSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, false).build(); + IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> createIndex(TEST_INDEX, settings)); + assertTrue(exception.getMessage().contains("unknown setting")); + } + + public void testUpdateFeatureFlagDisabled() { + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + + createIndex(TEST_INDEX, settings); + IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> { + client().admin() + .indices() + .prepareUpdateSettings(TEST_INDEX) + .setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)) + .get(); + }); + assertTrue(exception.getMessage().contains("unknown setting")); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaIT.java new file mode 100644 index 0000000000000..5fc8e30ed2c7a --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaIT.java @@ -0,0 +1,210 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.settings; + +import org.opensearch.action.support.WriteRequest; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.InternalTestCluster; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.io.IOException; + +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; +import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SearchOnlyReplicaIT extends OpenSearchIntegTestCase { + + private static final String TEST_INDEX = "test_index"; + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build(); + } + + private final String expectedFailureMessage = + "To set index.number_of_search_only_replicas, index.replication.type must be set to SEGMENT"; + + @Override + public Settings indexSettings() { + return Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) + .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "0ms") // so that after we punt a node we can immediately try to + // reallocate after node left. + .put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + } + + public void testCreateDocRepFails() { + Settings settings = Settings.builder().put(indexSettings()).put(SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT).build(); + + IllegalArgumentException illegalArgumentException = expectThrows( + IllegalArgumentException.class, + () -> createIndex(TEST_INDEX, settings) + ); + assertEquals(expectedFailureMessage, illegalArgumentException.getMessage()); + } + + public void testUpdateDocRepFails() { + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT) + .build(); + // create succeeds + createIndex(TEST_INDEX, settings); + + // update fails + IllegalArgumentException illegalArgumentException = expectThrows(IllegalArgumentException.class, () -> { + client().admin() + .indices() + .prepareUpdateSettings(TEST_INDEX) + .setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)) + .get(); + }); + assertEquals(expectedFailureMessage, illegalArgumentException.getMessage()); + } + + public void testFailoverWithSearchReplica_WithWriterReplicas() throws IOException { + int numSearchReplicas = 1; + int numWriterReplicas = 1; + internalCluster().startClusterManagerOnlyNode(); + String primaryNodeName = internalCluster().startDataOnlyNode(); + createIndex( + TEST_INDEX, + Settings.builder() + .put(indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numWriterReplicas) + .put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, numSearchReplicas) + .build() + ); + ensureYellow(TEST_INDEX); + // add 2 nodes for the replicas + internalCluster().startDataOnlyNodes(2); + ensureGreen(TEST_INDEX); + + // assert shards are on separate nodes & all active + assertActiveShardCounts(numSearchReplicas, numWriterReplicas); + + // stop the primary and ensure search shard is not promoted: + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName)); + ensureYellowAndNoInitializingShards(TEST_INDEX); + + assertActiveShardCounts(numSearchReplicas, 0); // 1 repl is inactive that was promoted to primary + // add back a node + internalCluster().startDataOnlyNode(); + ensureGreen(TEST_INDEX); + + } + + public void testFailoverWithSearchReplica_WithoutWriterReplicas() throws IOException { + int numSearchReplicas = 1; + int numWriterReplicas = 0; + internalCluster().startClusterManagerOnlyNode(); + String primaryNodeName = internalCluster().startDataOnlyNode(); + createIndex( + TEST_INDEX, + Settings.builder() + .put(indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numWriterReplicas) + .put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, numSearchReplicas) + .build() + ); + ensureYellow(TEST_INDEX); + client().prepareIndex(TEST_INDEX).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + // start a node for our search replica + String replica = internalCluster().startDataOnlyNode(); + ensureGreen(TEST_INDEX); + assertActiveSearchShards(numSearchReplicas); + assertHitCount(client(replica).prepareSearch(TEST_INDEX).setSize(0).setPreference("_only_local").get(), 1); + + // stop the primary and ensure search shard is not promoted: + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName)); + ensureRed(TEST_INDEX); + assertActiveSearchShards(numSearchReplicas); + // while red our search shard is still searchable + assertHitCount(client(replica).prepareSearch(TEST_INDEX).setSize(0).setPreference("_only_local").get(), 1); + } + + public void testSearchReplicaScaling() { + internalCluster().startNodes(2); + createIndex(TEST_INDEX); + ensureGreen(TEST_INDEX); + // assert settings + Metadata metadata = client().admin().cluster().prepareState().get().getState().metadata(); + int numSearchReplicas = Integer.parseInt(metadata.index(TEST_INDEX).getSettings().get(SETTING_NUMBER_OF_SEARCH_REPLICAS)); + assertEquals(1, numSearchReplicas); + + // assert cluster state & routing table + assertActiveSearchShards(1); + + // Add another node and search replica + internalCluster().startDataOnlyNode(); + client().admin() + .indices() + .prepareUpdateSettings(TEST_INDEX) + .setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 2)) + .get(); + + ensureGreen(TEST_INDEX); + assertActiveSearchShards(2); + + // remove all search shards + client().admin() + .indices() + .prepareUpdateSettings(TEST_INDEX) + .setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0)) + .get(); + ensureGreen(TEST_INDEX); + assertActiveSearchShards(0); + } + + /** + * Helper to assert counts of active shards for each type. + */ + private void assertActiveShardCounts(int expectedSearchReplicaCount, int expectedWriteReplicaCount) { + // assert routing table + IndexShardRoutingTable indexShardRoutingTable = getIndexShardRoutingTable(); + // assert search replica count + int activeCount = expectedSearchReplicaCount + expectedWriteReplicaCount; + assertEquals(expectedSearchReplicaCount, indexShardRoutingTable.searchOnlyReplicas().stream().filter(ShardRouting::active).count()); + assertEquals(expectedWriteReplicaCount, indexShardRoutingTable.writerReplicas().stream().filter(ShardRouting::active).count()); + assertEquals( + expectedWriteReplicaCount + expectedSearchReplicaCount, + indexShardRoutingTable.replicaShards().stream().filter(ShardRouting::active).count() + ); + + // assert routing nodes + ClusterState clusterState = getClusterState(); + assertEquals(activeCount, clusterState.getRoutingNodes().shards(r -> r.active() && !r.primary()).size()); + assertEquals(expectedSearchReplicaCount, clusterState.getRoutingNodes().shards(r -> r.active() && r.isSearchOnly()).size()); + assertEquals( + expectedWriteReplicaCount, + clusterState.getRoutingNodes().shards(r -> r.active() && !r.primary() && !r.isSearchOnly()).size() + ); + } + + private void assertActiveSearchShards(int expectedSearchReplicaCount) { + assertActiveShardCounts(expectedSearchReplicaCount, 0); + } + + private IndexShardRoutingTable getIndexShardRoutingTable() { + return getClusterState().routingTable().index(TEST_INDEX).shards().values().stream().findFirst().get(); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index 9e6eebaa3c44e..a9c54f2bab88e 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -69,6 +69,7 @@ import org.opensearch.index.IndexModule; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.indices.replication.SegmentReplicationSource; import org.opensearch.indices.replication.common.ReplicationType; import java.io.IOException; @@ -243,6 +244,22 @@ static Setting buildNumberOfShardsSetting() { Property.IndexScope ); + /** + * Setting to control the number of search only replicas for an index. + * A search only replica exists solely to perform read operations for a shard and are designed to achieve + * isolation from writers (primary shards). This means they are not primary eligible and do not have any direct communication + * with their primary. Search replicas require the use of Segment Replication on the index and poll their {@link SegmentReplicationSource} for + * updates. //TODO: Once physical isolation is introduced, reference the setting here. + */ + public static final String SETTING_NUMBER_OF_SEARCH_REPLICAS = "index.number_of_search_only_replicas"; + public static final Setting INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING = Setting.intSetting( + SETTING_NUMBER_OF_SEARCH_REPLICAS, + 0, + 0, + Property.Dynamic, + Property.IndexScope + ); + public static final String SETTING_ROUTING_PARTITION_SIZE = "index.routing_partition_size"; public static final Setting INDEX_ROUTING_PARTITION_SIZE_SETTING = Setting.intSetting( SETTING_ROUTING_PARTITION_SIZE, @@ -650,6 +667,7 @@ public static APIBlock readFrom(StreamInput input) throws IOException { private final int numberOfShards; private final int numberOfReplicas; + private final int numberOfSearchOnlyReplicas; private final Index index; private final long version; @@ -701,6 +719,7 @@ private IndexMetadata( final State state, final int numberOfShards, final int numberOfReplicas, + final int numberOfSearchOnlyReplicas, final Settings settings, final Map mappings, final Map aliases, @@ -733,7 +752,8 @@ private IndexMetadata( this.state = state; this.numberOfShards = numberOfShards; this.numberOfReplicas = numberOfReplicas; - this.totalNumberOfShards = numberOfShards * (numberOfReplicas + 1); + this.numberOfSearchOnlyReplicas = numberOfSearchOnlyReplicas; + this.totalNumberOfShards = numberOfShards * (numberOfReplicas + numberOfSearchOnlyReplicas + 1); this.settings = settings; this.mappings = Collections.unmodifiableMap(mappings); this.customData = Collections.unmodifiableMap(customData); @@ -835,6 +855,10 @@ public int getNumberOfReplicas() { return numberOfReplicas; } + public int getNumberOfSearchOnlyReplicas() { + return numberOfSearchOnlyReplicas; + } + public int getRoutingPartitionSize() { return routingPartitionSize; } @@ -1348,6 +1372,11 @@ public Builder numberOfReplicas(int numberOfReplicas) { return this; } + public Builder numberOfSearchReplicas(int numberOfSearchReplicas) { + settings = Settings.builder().put(settings).put(SETTING_NUMBER_OF_SEARCH_REPLICAS, numberOfSearchReplicas).build(); + return this; + } + public Builder routingPartitionSize(int routingPartitionSize) { settings = Settings.builder().put(settings).put(SETTING_ROUTING_PARTITION_SIZE, routingPartitionSize).build(); return this; @@ -1535,6 +1564,7 @@ public IndexMetadata build() { throw new IllegalArgumentException("must specify number of replicas for index [" + index + "]"); } final int numberOfReplicas = INDEX_NUMBER_OF_REPLICAS_SETTING.get(settings); + final int numberOfSearchReplicas = INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.get(settings); int routingPartitionSize = INDEX_ROUTING_PARTITION_SIZE_SETTING.get(settings); if (routingPartitionSize != 1 && routingPartitionSize >= getRoutingNumShards()) { @@ -1630,6 +1660,7 @@ public IndexMetadata build() { state, numberOfShards, numberOfReplicas, + numberOfSearchReplicas, tmpSettings, mappings, tmpAliases, diff --git a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java index e9e6ebc8ccf6a..42739c4670ebc 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java @@ -1508,6 +1508,24 @@ public Builder updateNumberOfReplicas(final int numberOfReplicas, final String[] return this; } + /** + * Update the number of search replicas for the specified indices. + * + * @param numberOfSearchReplicas the number of search replicas + * @param indices the indices to update the number of replicas for + * @return the builder + */ + public Builder updateNumberOfSearchReplicas(final int numberOfSearchReplicas, final String[] indices) { + for (String index : indices) { + IndexMetadata indexMetadata = this.indices.get(index); + if (indexMetadata == null) { + throw new IndexNotFoundException(index); + } + put(IndexMetadata.builder(indexMetadata).numberOfSearchReplicas(numberOfSearchReplicas)); + } + return this; + } + public Builder coordinationMetadata(CoordinationMetadata coordinationMetadata) { this.coordinationMetadata = coordinationMetadata; return this; diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java index 4cabda9b45a1e..97d3af9fd579c 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java @@ -76,6 +76,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.Strings; @@ -134,12 +135,14 @@ import static java.util.stream.Collectors.toList; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING; +import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_REPLICATION_TYPE_SETTING; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_INDEX_UUID; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED; @@ -969,6 +972,9 @@ static Settings aggregateIndexSettings( updateReplicationStrategy(indexSettingsBuilder, request.settings(), settings, combinedTemplateSettings, clusterSettings); updateRemoteStoreSettings(indexSettingsBuilder, currentState, clusterSettings, settings, request.index()); + if (FeatureFlags.isEnabled(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING)) { + updateSearchOnlyReplicas(request.settings(), indexSettingsBuilder); + } if (sourceMetadata != null) { assert request.resizeType() != null; @@ -1004,10 +1010,26 @@ static Settings aggregateIndexSettings( validateStoreTypeSettings(indexSettings); validateRefreshIntervalSettings(request.settings(), clusterSettings); validateTranslogDurabilitySettings(request.settings(), clusterSettings, settings); - return indexSettings; } + private static void updateSearchOnlyReplicas(Settings requestSettings, Settings.Builder builder) { + if (INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.exists(builder) && builder.get(SETTING_NUMBER_OF_SEARCH_REPLICAS) != null) { + if (INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.get(requestSettings) > 0 + && ReplicationType.parseString(builder.get(INDEX_REPLICATION_TYPE_SETTING.getKey())).equals(ReplicationType.DOCUMENT)) { + throw new IllegalArgumentException( + "To set " + + SETTING_NUMBER_OF_SEARCH_REPLICAS + + ", " + + INDEX_REPLICATION_TYPE_SETTING.getKey() + + " must be set to " + + ReplicationType.SEGMENT + ); + } + builder.put(SETTING_NUMBER_OF_SEARCH_REPLICAS, INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.get(requestSettings)); + } + } + /** * Updates index settings to set replication strategy by default based on cluster level settings or remote store * node attributes diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java index 7d4c3512ed757..00b2ea0693b9d 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java @@ -57,11 +57,13 @@ import org.opensearch.common.settings.IndexScopedSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.Index; import org.opensearch.index.IndexSettings; import org.opensearch.indices.IndicesService; import org.opensearch.indices.ShardLimitValidator; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; @@ -73,6 +75,8 @@ import java.util.Set; import static org.opensearch.action.support.ContextPreservingActionListener.wrapPreservingContext; +import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_REPLICATION_TYPE_SETTING; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS; import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateRefreshIntervalSettings; import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateTranslogDurabilitySettings; import static org.opensearch.common.settings.AbstractScopedSettings.ARCHIVED_SETTINGS_PREFIX; @@ -260,6 +264,34 @@ public ClusterState execute(ClusterState currentState) { } } + if (IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.exists(openSettings)) { + if (FeatureFlags.isEnabled(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING)) { + validateSearchReplicaCountSettings(normalizedSettings, request.indices(), currentState); + } + final int updatedNumberOfSearchReplicas = IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.get(openSettings); + if (preserveExisting == false) { + // TODO: Honor awareness validation to search replicas. + + // Verify that this won't take us over the cluster shard limit. + int totalNewShards = Arrays.stream(request.indices()) + .mapToInt(i -> getTotalNewShards(i, currentState, updatedNumberOfSearchReplicas)) + .sum(); + Optional error = shardLimitValidator.checkShardLimit(totalNewShards, currentState); + if (error.isPresent()) { + ValidationException ex = new ValidationException(); + ex.addValidationError(error.get()); + throw ex; + } + routingTableBuilder.updateNumberOfSearchReplicas(updatedNumberOfSearchReplicas, actualIndices); + metadataBuilder.updateNumberOfSearchReplicas(updatedNumberOfSearchReplicas, actualIndices); + logger.info( + "updating number_of_Search Replicas to [{}] for indices {}", + updatedNumberOfSearchReplicas, + actualIndices + ); + } + } + if (!openIndices.isEmpty()) { for (Index index : openIndices) { IndexMetadata indexMetadata = metadataBuilder.getSafe(index); @@ -469,4 +501,27 @@ public ClusterState execute(ClusterState currentState) { } ); } + + /** + * Validates that if we are trying to update search replica count the index is segrep enabled. + * + * @param requestSettings {@link Settings} + * @param indices indices that are changing + * @param currentState {@link ClusterState} current cluster state + */ + private void validateSearchReplicaCountSettings(Settings requestSettings, Index[] indices, ClusterState currentState) { + final int updatedNumberOfSearchReplicas = IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.get(requestSettings); + if (updatedNumberOfSearchReplicas > 0) { + if (Arrays.stream(indices).allMatch(index -> currentState.metadata().isSegmentReplicationEnabled(index.getName())) == false) { + throw new IllegalArgumentException( + "To set " + + SETTING_NUMBER_OF_SEARCH_REPLICAS + + ", " + + INDEX_REPLICATION_TYPE_SETTING.getKey() + + " must be set to " + + ReplicationType.SEGMENT + ); + } + } + } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java index 7c179f6d4d8fd..80c9d42cdb617 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java @@ -139,7 +139,7 @@ boolean validate(Metadata metadata) { // check the replicas for (IndexShardRoutingTable indexShardRoutingTable : this) { int routingNumberOfReplicas = indexShardRoutingTable.size() - 1; - if (routingNumberOfReplicas != indexMetadata.getNumberOfReplicas()) { + if (routingNumberOfReplicas != indexMetadata.getNumberOfReplicas() + indexMetadata.getNumberOfSearchOnlyReplicas()) { throw new IllegalStateException( "Shard [" + indexShardRoutingTable.shardId().id() @@ -157,7 +157,9 @@ boolean validate(Metadata metadata) { ); } final Set inSyncAllocationIds = indexMetadata.inSyncAllocationIds(shardRouting.id()); - if (shardRouting.active() && inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false) { + if (shardRouting.active() + && inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false + && shardRouting.isSearchOnly() == false) { throw new IllegalStateException( "active shard routing " + shardRouting @@ -594,6 +596,17 @@ private Builder initializeEmpty(IndexMetadata indexMetadata, UnassignedInfo unas ) ); } + for (int i = 0; i < indexMetadata.getNumberOfSearchOnlyReplicas(); i++) { + indexShardRoutingBuilder.addShard( + ShardRouting.newUnassigned( + shardId, + false, + true, + PeerRecoverySource.INSTANCE, // TODO: Update to remote store if enabled + unassignedInfo + ) + ); + } shards.put(shardNumber, indexShardRoutingBuilder.build()); } return this; @@ -614,6 +627,26 @@ public Builder addReplica() { return this; } + /** + * Add a Search only replica to the IndexShardRoutingTable + * @return The Builder + */ + public Builder addSearchReplica() { + for (final int shardNumber : shards.keySet()) { + ShardId shardId = new ShardId(index, shardNumber); + // version 0, will get updated when reroute will happen + ShardRouting shard = ShardRouting.newUnassigned( + shardId, + false, + true, + PeerRecoverySource.INSTANCE, // TODO: Change to remote store if enabled + new UnassignedInfo(UnassignedInfo.Reason.REPLICA_ADDED, null) + ); + shards.put(shardNumber, new IndexShardRoutingTable.Builder(shards.get(shard.id())).addShard(shard).build()); + } + return this; + } + public Builder removeReplica() { for (final int shardId : shards.keySet()) { IndexShardRoutingTable indexShard = shards.get(shardId); @@ -629,7 +662,7 @@ public Builder removeReplica() { // first check if there is one that is not assigned to a node, and remove it boolean removed = false; for (ShardRouting shardRouting : indexShard) { - if (!shardRouting.primary() && !shardRouting.assignedToNode()) { + if (!shardRouting.primary() && !shardRouting.assignedToNode() && !shardRouting.isSearchOnly()) { builder.removeShard(shardRouting); removed = true; break; @@ -637,7 +670,45 @@ public Builder removeReplica() { } if (!removed) { for (ShardRouting shardRouting : indexShard) { - if (!shardRouting.primary()) { + if (!shardRouting.primary() && !shardRouting.isSearchOnly()) { + builder.removeShard(shardRouting); + break; + } + } + } + shards.put(shardId, builder.build()); + } + return this; + } + + /** + * Remove a Search only replica from the IndexShardRoutingTable + * @return The Builder + */ + public Builder removeSearchReplica() { + for (final int shardId : shards.keySet()) { + IndexShardRoutingTable indexShardRoutingTable = shards.get(shardId); + if (indexShardRoutingTable.searchOnlyReplicas().isEmpty()) { + // nothing to do here! + return this; + } + // re-add all the current ones + IndexShardRoutingTable.Builder builder = new IndexShardRoutingTable.Builder(indexShardRoutingTable.shardId()); + for (ShardRouting shardRouting : indexShardRoutingTable) { + builder.addShard(shardRouting); + } + // first check if there is one that is not assigned to a node, and remove it + boolean removed = false; + for (ShardRouting shardRouting : indexShardRoutingTable) { + if (shardRouting.isSearchOnly() && !shardRouting.assignedToNode()) { + builder.removeShard(shardRouting); + removed = true; + break; + } + } + if (!removed) { + for (ShardRouting shardRouting : indexShardRoutingTable) { + if (shardRouting.isSearchOnly()) { builder.removeShard(shardRouting); break; } diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java index 479143fa9a2f0..00ddef507a162 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java @@ -211,6 +211,24 @@ public List getShards() { return shards(); } + /** + * Returns a {@link List} of the search only shards in the RoutingTable + * + * @return a {@link List} of shards + */ + public List searchOnlyReplicas() { + return replicas.stream().filter(ShardRouting::isSearchOnly).collect(Collectors.toList()); + } + + /** + * Returns a {@link List} of the writer replicas (primary eligible) shards in the RoutingTable + * + * @return a {@link List} of shards + */ + public List writerReplicas() { + return replicas.stream().filter(r -> r.isSearchOnly() == false).collect(Collectors.toList()); + } + /** * Returns a {@link List} of active shards * diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java index 04a44b86635d0..884387b45e015 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java @@ -385,7 +385,7 @@ public ShardRouting activeReplicaWithHighestVersion(ShardId shardId) { // be accessible. Therefore, we need to protect against the version being null // (meaning the node will be going away). return assignedShards(shardId).stream() - .filter(shr -> !shr.primary() && shr.active()) + .filter(shr -> !shr.primary() && shr.active() && !shr.isSearchOnly()) .filter(shr -> node(shr.currentNodeId()) != null) .max( Comparator.comparing( @@ -409,7 +409,7 @@ public ShardRouting activeReplicaWithOldestVersion(ShardId shardId) { // It's possible for replicaNodeVersion to be null. Therefore, we need to protect against the version being null // (meaning the node will be going away). return assignedShards(shardId).stream() - .filter(shr -> !shr.primary() && shr.active()) + .filter(shr -> !shr.primary() && shr.active() && !shr.isSearchOnly()) .filter(shr -> node(shr.currentNodeId()) != null) .min( Comparator.comparing( @@ -428,7 +428,7 @@ public ShardRouting activeReplicaWithOldestVersion(ShardId shardId) { * are preferred for primary promotion */ public ShardRouting activeReplicaOnRemoteNode(ShardId shardId) { - return assignedShards(shardId).stream().filter(shr -> !shr.primary() && shr.active()).filter((shr) -> { + return assignedShards(shardId).stream().filter(shr -> !shr.primary() && shr.active() && !shr.isSearchOnly()).filter((shr) -> { RoutingNode nd = node(shr.currentNodeId()); return (nd != null && nd.node().isRemoteStoreNode()); }).findFirst().orElse(null); @@ -820,6 +820,7 @@ private ShardRouting cancelRelocation(ShardRouting shard) { private ShardRouting promoteActiveReplicaShardToPrimary(ShardRouting replicaShard) { assert replicaShard.active() : "non-active shard cannot be promoted to primary: " + replicaShard; assert replicaShard.primary() == false : "primary shard cannot be promoted to primary: " + replicaShard; + assert replicaShard.isSearchOnly() == false : "search only replica cannot be promoted to primary: " + replicaShard; ShardRouting primaryShard = replicaShard.moveActiveReplicaToPrimary(); updateAssigned(replicaShard, primaryShard); return primaryShard; diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java index 6c7b94f316da2..f805ed09c2512 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java @@ -520,7 +520,7 @@ public Builder updateNumberOfReplicas(final int numberOfReplicas, final String[] // ignore index missing failure, its closed... continue; } - int currentNumberOfReplicas = indexRoutingTable.shards().get(0).size() - 1; // remove the required primary + int currentNumberOfReplicas = indexRoutingTable.shards().get(0).writerReplicas().size(); IndexRoutingTable.Builder builder = new IndexRoutingTable.Builder(indexRoutingTable.getIndex()); // re-add all the shards for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { @@ -541,6 +541,45 @@ public Builder updateNumberOfReplicas(final int numberOfReplicas, final String[] return this; } + /** + * Update the number of search replicas for the specified indices. + * + * @param numberOfSearchReplicas the number of replicas + * @param indices the indices to update the number of replicas for + * @return the builder + */ + public Builder updateNumberOfSearchReplicas(final int numberOfSearchReplicas, final String[] indices) { + if (indicesRouting == null) { + throw new IllegalStateException("once build is called the builder cannot be reused"); + } + for (String index : indices) { + IndexRoutingTable indexRoutingTable = indicesRouting.get(index); + if (indexRoutingTable == null) { + // ignore index missing failure, its closed... + continue; + } + IndexShardRoutingTable shardRoutings = indexRoutingTable.shards().get(0); + int currentNumberOfSearchReplicas = shardRoutings.searchOnlyReplicas().size(); + IndexRoutingTable.Builder builder = new IndexRoutingTable.Builder(indexRoutingTable.getIndex()); + // re-add all the shards + for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { + builder.addIndexShard(indexShardRoutingTable); + } + if (currentNumberOfSearchReplicas < numberOfSearchReplicas) { + // now, add "empty" ones + for (int i = 0; i < (numberOfSearchReplicas - currentNumberOfSearchReplicas); i++) { + builder.addSearchReplica(); + } + } else if (currentNumberOfSearchReplicas > numberOfSearchReplicas) { + for (int i = 0; i < (currentNumberOfSearchReplicas - numberOfSearchReplicas); i++) { + builder.removeSearchReplica(); + } + } + indicesRouting.put(index, builder.build()); + } + return this; + } + public Builder addAsNew(IndexMetadata indexMetadata) { if (indexMetadata.getState() == IndexMetadata.State.OPEN) { IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetadata.getIndex()).initializeAsNew( diff --git a/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java b/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java index 45de045a8fc69..ada35caa1e61e 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java @@ -32,11 +32,13 @@ package org.opensearch.cluster.routing; +import org.opensearch.Version; import org.opensearch.cluster.routing.RecoverySource.ExistingStoreRecoverySource; import org.opensearch.cluster.routing.RecoverySource.PeerRecoverySource; import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.opensearch.common.Nullable; import org.opensearch.common.annotation.PublicApi; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; @@ -67,6 +69,7 @@ public class ShardRouting implements Writeable, ToXContentObject { private final String currentNodeId; private final String relocatingNodeId; private final boolean primary; + private final boolean searchOnly; private final ShardRoutingState state; private final RecoverySource recoverySource; private final UnassignedInfo unassignedInfo; @@ -85,6 +88,7 @@ protected ShardRouting( String currentNodeId, String relocatingNodeId, boolean primary, + boolean searchOnly, ShardRoutingState state, RecoverySource recoverySource, UnassignedInfo unassignedInfo, @@ -95,6 +99,7 @@ protected ShardRouting( this.currentNodeId = currentNodeId; this.relocatingNodeId = relocatingNodeId; this.primary = primary; + this.searchOnly = searchOnly; this.state = state; this.recoverySource = recoverySource; this.unassignedInfo = unassignedInfo; @@ -116,6 +121,31 @@ protected ShardRouting( + this; } + protected ShardRouting( + ShardId shardId, + String relocatingNodeId, + String currentNodeId, + boolean primary, + ShardRoutingState shardRoutingState, + RecoverySource recoverySource, + UnassignedInfo unassignedInfo, + AllocationId allocationId, + long expectedShardSize + ) { + this( + shardId, + relocatingNodeId, + currentNodeId, + primary, + false, + shardRoutingState, + recoverySource, + unassignedInfo, + allocationId, + expectedShardSize + ); + } + @Nullable private ShardRouting initializeTargetRelocatingShard() { if (state == ShardRoutingState.RELOCATING) { @@ -124,6 +154,7 @@ private ShardRouting initializeTargetRelocatingShard() { relocatingNodeId, currentNodeId, primary, + searchOnly, ShardRoutingState.INITIALIZING, PeerRecoverySource.INSTANCE, unassignedInfo, @@ -143,12 +174,26 @@ public static ShardRouting newUnassigned( boolean primary, RecoverySource recoverySource, UnassignedInfo unassignedInfo + ) { + return newUnassigned(shardId, primary, false, recoverySource, unassignedInfo); + } + + /** + * Creates a new unassigned shard, overloaded for bwc for searchOnly addition. + */ + public static ShardRouting newUnassigned( + ShardId shardId, + boolean primary, + boolean search, + RecoverySource recoverySource, + UnassignedInfo unassignedInfo ) { return new ShardRouting( shardId, null, null, primary, + search, ShardRoutingState.UNASSIGNED, recoverySource, unassignedInfo, @@ -280,6 +325,13 @@ public boolean primary() { return this.primary; } + /** + * Returns true iff this shard is a search only replica. + */ + public boolean isSearchOnly() { + return searchOnly; + } + /** * The shard state. */ @@ -306,6 +358,11 @@ public ShardRouting(ShardId shardId, StreamInput in) throws IOException { currentNodeId = in.readOptionalString(); relocatingNodeId = in.readOptionalString(); primary = in.readBoolean(); + if (in.getVersion().onOrAfter(Version.V_2_17_0)) { + searchOnly = in.readBoolean(); + } else { + searchOnly = false; + } state = ShardRoutingState.fromValue(in.readByte()); if (state == ShardRoutingState.UNASSIGNED || state == ShardRoutingState.INITIALIZING) { recoverySource = RecoverySource.readFrom(in); @@ -339,6 +396,9 @@ public void writeToThin(StreamOutput out) throws IOException { out.writeOptionalString(currentNodeId); out.writeOptionalString(relocatingNodeId); out.writeBoolean(primary); + if (out.getVersion().onOrAfter(Version.V_2_17_0)) { + out.writeBoolean(searchOnly); + } out.writeByte(state.value()); if (state == ShardRoutingState.UNASSIGNED || state == ShardRoutingState.INITIALIZING) { recoverySource.writeTo(out); @@ -364,6 +424,7 @@ public ShardRouting updateUnassigned(UnassignedInfo unassignedInfo, RecoverySour currentNodeId, relocatingNodeId, primary, + searchOnly, state, recoverySource, unassignedInfo, @@ -392,6 +453,7 @@ public ShardRouting moveToUnassigned(UnassignedInfo unassignedInfo) { null, null, primary, + searchOnly, ShardRoutingState.UNASSIGNED, recoverySource, unassignedInfo, @@ -419,6 +481,7 @@ public ShardRouting initialize(String nodeId, @Nullable String existingAllocatio nodeId, null, primary, + searchOnly, ShardRoutingState.INITIALIZING, recoverySource, unassignedInfo, @@ -439,6 +502,7 @@ public ShardRouting relocate(String relocatingNodeId, long expectedShardSize) { currentNodeId, relocatingNodeId, primary, + searchOnly, ShardRoutingState.RELOCATING, recoverySource, null, @@ -460,6 +524,7 @@ public ShardRouting cancelRelocation() { currentNodeId, null, primary, + searchOnly, ShardRoutingState.STARTED, recoverySource, null, @@ -483,6 +548,7 @@ public ShardRouting removeRelocationSource() { currentNodeId, null, primary, + searchOnly, state, recoverySource, unassignedInfo, @@ -503,6 +569,7 @@ public ShardRouting reinitializeReplicaShard() { currentNodeId, null, primary, + searchOnly, ShardRoutingState.INITIALIZING, recoverySource, unassignedInfo, @@ -528,6 +595,7 @@ public ShardRouting moveToStarted() { currentNodeId, null, primary, + searchOnly, ShardRoutingState.STARTED, null, null, @@ -546,11 +614,15 @@ public ShardRouting moveActivePrimaryToReplica() { if (!primary) { throw new IllegalShardRoutingStateException(this, "Not a primary shard, can't move to replica"); } + if (searchOnly) { + throw new IllegalShardRoutingStateException(this, "Cannot move a primary to a search only replica"); + } return new ShardRouting( shardId, currentNodeId, relocatingNodeId, false, + false, state, recoverySource, unassignedInfo, @@ -569,11 +641,15 @@ public ShardRouting moveActiveReplicaToPrimary() { if (primary) { throw new IllegalShardRoutingStateException(this, "Already primary, can't move to primary"); } + if (searchOnly) { + throw new IllegalShardRoutingStateException(this, "Cannot move a search only replica to primary"); + } return new ShardRouting( shardId, currentNodeId, relocatingNodeId, true, + false, state, recoverySource, unassignedInfo, @@ -811,7 +887,11 @@ public String shortSummary() { if (primary) { sb.append("[P]"); } else { - sb.append("[R]"); + if (searchOnly) { + sb.append("[S]"); + } else { + sb.append("[R]"); + } } if (recoverySource != null) { sb.append(", recovery_source[").append(recoverySource).append("]"); @@ -831,10 +911,11 @@ public String shortSummary() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject() - .field("state", state()) - .field("primary", primary()) - .field("node", currentNodeId()) + XContentBuilder fieldBuilder = builder.startObject().field("state", state()).field("primary", primary()); + if (FeatureFlags.isEnabled(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL)) { + fieldBuilder.field("searchOnly", isSearchOnly()); + } + fieldBuilder.field("node", currentNodeId()) .field("relocating_node", relocatingNodeId()) .field("shard", id()) .field("index", getIndexName()); diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java index 2431f57a6a1f9..113d5803c1d65 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java @@ -259,7 +259,9 @@ private IndexMetadata.Builder updateInSyncAllocations( // We use number_of_replicas + 1 (= possible active shard copies) to bound the inSyncAllocationIds set // Only trim the set of allocation ids when it grows, otherwise we might trim too eagerly when the number // of replicas was decreased while shards were unassigned. - int maxActiveShards = oldIndexMetadata.getNumberOfReplicas() + 1; // +1 for the primary + int maxActiveShards = oldIndexMetadata.getNumberOfReplicas() + oldIndexMetadata.getNumberOfSearchOnlyReplicas() + 1; // +1 for + // the + // primary IndexShardRoutingTable newShardRoutingTable = newRoutingTable.shardRoutingTable(shardId); assert newShardRoutingTable.assignedShards() .stream() diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java index 61e7aaed5ecff..4bde1e282fe78 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java @@ -357,7 +357,7 @@ private ShardRouting initializingShard(ShardRouting shardRouting, String current @Override public Decision canMoveAway(ShardRouting shardRouting, RoutingAllocation allocation) { int outgoingRecoveries = 0; - if (!shardRouting.primary()) { + if (!shardRouting.primary() && !shardRouting.isSearchOnly()) { ShardRouting primaryShard = allocation.routingNodes().activePrimary(shardRouting.shardId()); outgoingRecoveries = allocation.routingNodes().getOutgoingRecoveries(primaryShard.currentNodeId()); } else { diff --git a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java index a6b241bf68209..fec19e9d286b8 100644 --- a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java @@ -39,6 +39,7 @@ protected FeatureFlagSettings( FeatureFlags.PLUGGABLE_CACHE_SETTING, FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL_SETTING, FeatureFlags.APPLICATION_BASED_CONFIGURATION_TEMPLATES_SETTING, - FeatureFlags.STAR_TREE_INDEX_SETTING + FeatureFlags.STAR_TREE_INDEX_SETTING, + FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING ); } diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 6ae8f3d2057c6..a765fc31e0c1a 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -274,7 +274,9 @@ public final class IndexScopedSettings extends AbstractScopedSettings { */ public static final Map> FEATURE_FLAGGED_INDEX_SETTINGS = Map.of( FeatureFlags.TIERED_REMOTE_INDEX, - List.of(IndexModule.INDEX_STORE_LOCALITY_SETTING, IndexModule.INDEX_TIERING_STATE) + List.of(IndexModule.INDEX_STORE_LOCALITY_SETTING, IndexModule.INDEX_TIERING_STATE), + FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, + List.of(IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING) ); public static final IndexScopedSettings DEFAULT_SCOPED_SETTINGS = new IndexScopedSettings(Settings.EMPTY, BUILT_IN_INDEX_SETTINGS); diff --git a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java index 023e4cc1c0df5..f391547b2055e 100644 --- a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java +++ b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java @@ -72,6 +72,8 @@ public class FeatureFlags { */ public static final String REMOTE_PUBLICATION_EXPERIMENTAL = "opensearch.experimental.feature.remote_store.publication.enabled"; + public static final String READER_WRITER_SPLIT_EXPERIMENTAL = "opensearch.experimental.feature.read.write.split.enabled"; + public static final Setting REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING = Setting.boolSetting( REMOTE_STORE_MIGRATION_EXPERIMENTAL, false, @@ -100,6 +102,12 @@ public class FeatureFlags { Property.NodeScope ); + public static final Setting READER_WRITER_SPLIT_EXPERIMENTAL_SETTING = Setting.boolSetting( + READER_WRITER_SPLIT_EXPERIMENTAL, + false, + Property.NodeScope + ); + /** * Gates the functionality of application based configuration templates. */ @@ -127,7 +135,8 @@ public class FeatureFlags { PLUGGABLE_CACHE_SETTING, REMOTE_PUBLICATION_EXPERIMENTAL_SETTING, APPLICATION_BASED_CONFIGURATION_TEMPLATES_SETTING, - STAR_TREE_INDEX_SETTING + STAR_TREE_INDEX_SETTING, + READER_WRITER_SPLIT_EXPERIMENTAL_SETTING ); /** diff --git a/server/src/main/java/org/opensearch/index/shard/ReplicationGroup.java b/server/src/main/java/org/opensearch/index/shard/ReplicationGroup.java index f585267f21832..ccfaf50da1c6b 100644 --- a/server/src/main/java/org/opensearch/index/shard/ReplicationGroup.java +++ b/server/src/main/java/org/opensearch/index/shard/ReplicationGroup.java @@ -72,7 +72,8 @@ public ReplicationGroup( this.replicationTargets = new ArrayList<>(); this.skippedShards = new ArrayList<>(); for (final ShardRouting shard : routingTable) { - if (shard.unassigned()) { + // search only replicas never receive any replicated operations + if (shard.unassigned() || shard.isSearchOnly()) { assert shard.primary() == false : "primary shard should not be unassigned in a replication group: " + shard; skippedShards.add(shard); } else { diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index f6ed113019897..5cef7b75cb2d4 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -171,7 +171,7 @@ public void clusterChanged(ClusterChangedEvent event) { if (indexService.getIndexSettings().isSegRepEnabledOrRemoteNode() && event.indexRoutingTableChanged(indexService.index().getName())) { for (IndexShard shard : indexService) { - if (shard.routingEntry().primary() == false) { + if (shard.routingEntry().primary() == false && shard.routingEntry().isSearchOnly() == false) { // for this shard look up its primary routing, if it has completed a relocation trigger replication final String previousNode = event.previousState() .routingTable() diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java index 0205c610e6b1a..dedf6dc305cb6 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java @@ -321,7 +321,11 @@ Table buildTable(RestRequest request, ClusterStateResponse state, IndicesStatsRe if (shard.primary()) { table.addCell("p"); } else { - table.addCell("r"); + if (shard.isSearchOnly()) { + table.addCell("s"); + } else { + table.addCell("r"); + } } table.addCell(shard.state()); table.addCell(getOrNull(commonStats, CommonStats::getDocs, DocsStats::getCount)); diff --git a/server/src/test/java/org/opensearch/action/support/replication/ReplicationOperationTests.java b/server/src/test/java/org/opensearch/action/support/replication/ReplicationOperationTests.java index ec5fc1d19e40d..6eb697d493bf4 100644 --- a/server/src/test/java/org/opensearch/action/support/replication/ReplicationOperationTests.java +++ b/server/src/test/java/org/opensearch/action/support/replication/ReplicationOperationTests.java @@ -42,13 +42,19 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.AllocationId; +import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.TestShardRouting; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.common.util.set.Sets; @@ -89,6 +95,7 @@ import static org.opensearch.action.support.replication.ClusterStateCreationUtils.state; import static org.opensearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary; import static org.opensearch.action.support.replication.ReplicationOperation.RetryOnPrimaryException; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS; import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.equalTo; @@ -944,6 +951,83 @@ public void updateLocalCheckpointForShard(String allocationId, long checkpoint) assertThat(shardInfo.getSuccessful(), equalTo(1 + getExpectedReplicas(shardId, state, trackedShards).size())); } + public void testReplicationOperationsAreNotSentToSearchReplicas() throws Exception { + final String index = "test"; + final ShardId shardId = new ShardId(index, "_na_", 0); + + ClusterState initialState = stateWithActivePrimary(index, true, randomInt(5)); + IndexMetadata indexMetadata = initialState.getMetadata().index(index); + // add a search only replica + DiscoveryNode node = new DiscoveryNode( + "nodeForSearchShard", + OpenSearchTestCase.buildNewFakeTransportAddress(), + Collections.emptyMap(), + new HashSet<>(DiscoveryNodeRole.BUILT_IN_ROLES), + Version.CURRENT + ); + IndexMetadata.Builder indexMetadataBuilder = new IndexMetadata.Builder(indexMetadata); + indexMetadataBuilder.settings(Settings.builder().put(indexMetadata.getSettings()).put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)); + + ShardRouting searchShardRouting = TestShardRouting.newShardRouting( + shardId, + node.getId(), + null, + false, + true, + ShardRoutingState.STARTED, + null + ); + IndexShardRoutingTable indexShardRoutingTable = initialState.getRoutingTable().shardRoutingTable(shardId); + IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(indexShardRoutingTable); + indexShardRoutingBuilder.addShard(searchShardRouting); + indexShardRoutingTable = indexShardRoutingBuilder.build(); + + ClusterState.Builder state = ClusterState.builder(initialState); + state.nodes(DiscoveryNodes.builder(initialState.nodes()).add(node).build()); + state.metadata(Metadata.builder().put(indexMetadataBuilder.build(), false)); + state.routingTable( + RoutingTable.builder().add(IndexRoutingTable.builder(indexMetadata.getIndex()).addIndexShard(indexShardRoutingTable)).build() + ); + initialState = state.build(); + // execute a request and check hits + + final Set trackedShards = new HashSet<>(); + final Set untrackedShards = new HashSet<>(); + ShardRouting primaryShard = indexShardRoutingTable.primaryShard(); + addTrackingInfo(indexShardRoutingTable, primaryShard, trackedShards, untrackedShards); + final ReplicationGroup replicationGroup = new ReplicationGroup( + indexShardRoutingTable, + indexMetadata.inSyncAllocationIds(0), + trackedShards, + 0 + ); + + // shards are not part of the rg + assertFalse(replicationGroup.getReplicationTargets().stream().anyMatch(ShardRouting::isSearchOnly)); + + Set initial = getExpectedReplicas(shardId, initialState, trackedShards); + final Set expectedReplicas = initial.stream().filter(shr -> shr.isSearchOnly() == false).collect(Collectors.toSet()); + Request request = new Request(shardId); + PlainActionFuture listener = new PlainActionFuture<>(); + final TestReplicaProxy replicasProxy = new TestReplicaProxy(new HashMap<>()); + + final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup, threadPool); + final TestReplicationOperation op = new TestReplicationOperation( + request, + primary, + listener, + replicasProxy, + indexMetadata.primaryTerm(0), + new FanoutReplicationProxy<>(replicasProxy) + ); + op.execute(); + assertTrue("request was not processed on primary", request.processedOnPrimary.get()); + assertEquals(request.processedOnReplicas, expectedReplicas); + assertEquals(replicasProxy.failedReplicas, Collections.emptySet()); + assertEquals(replicasProxy.markedAsStaleCopies, Collections.emptySet()); + assertTrue(listener.isDone()); + } + private Set getExpectedReplicas(ShardId shardId, ClusterState state, Set trackedShards) { Set expectedReplicas = new HashSet<>(); String localNodeId = state.nodes().getLocalNodeId(); diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java index 80a74bdd82145..712e574ea57d7 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java @@ -129,10 +129,12 @@ import static java.util.Collections.singleton; import static java.util.Collections.singletonList; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING; +import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_READ_ONLY_BLOCK; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_REPLICATION_TYPE_SETTING; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_READ_ONLY; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY; @@ -146,6 +148,7 @@ import static org.opensearch.cluster.metadata.MetadataCreateIndexService.getIndexNumberOfRoutingShards; import static org.opensearch.cluster.metadata.MetadataCreateIndexService.parseV1Mappings; import static org.opensearch.cluster.metadata.MetadataCreateIndexService.resolveAndValidateAliases; +import static org.opensearch.common.util.FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL; import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL; import static org.opensearch.index.IndexModule.INDEX_STORE_TYPE_SETTING; import static org.opensearch.index.IndexSettings.INDEX_REFRESH_INTERVAL_SETTING; @@ -1205,7 +1208,7 @@ public void testParseMappingsWithTypelessTemplate() throws Exception { assertThat(mappings, Matchers.hasKey(MapperService.SINGLE_MAPPING_NAME)); } - public void testvalidateIndexSettings() { + public void testValidateIndexSettings() { ClusterService clusterService = mock(ClusterService.class); Metadata metadata = Metadata.builder() .transientSettings(Settings.builder().put(Metadata.DEFAULT_REPLICA_COUNT_SETTING.getKey(), 1).build()) @@ -2240,6 +2243,71 @@ public void testIndexCreationWithIndexStoreTypeRemoteStoreThrowsException() { ); } + public void testDefaultSearchReplicasSetting() { + FeatureFlags.initializeFeatureFlags(Settings.builder().put(READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build()); + Settings templateSettings = Settings.EMPTY; + request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test"); + final Settings.Builder requestSettings = Settings.builder(); + request.settings(requestSettings.build()); + Settings indexSettings = aggregateIndexSettings( + ClusterState.EMPTY_STATE, + request, + templateSettings, + null, + Settings.EMPTY, + IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, + randomShardLimitService(), + Collections.emptySet(), + clusterSettings + ); + assertFalse(INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.exists(indexSettings)); + } + + public void testSearchReplicasValidationWithSegmentReplication() { + FeatureFlags.initializeFeatureFlags(Settings.builder().put(READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build()); + Settings templateSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build(); + request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test"); + final Settings.Builder requestSettings = Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 2); + request.settings(requestSettings.build()); + Settings indexSettings = aggregateIndexSettings( + ClusterState.EMPTY_STATE, + request, + templateSettings, + null, + Settings.EMPTY, + IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, + randomShardLimitService(), + Collections.emptySet(), + clusterSettings + ); + assertEquals("2", indexSettings.get(SETTING_NUMBER_OF_SEARCH_REPLICAS)); + assertEquals(ReplicationType.SEGMENT.toString(), indexSettings.get(SETTING_REPLICATION_TYPE)); + } + + public void testSearchReplicasValidationWithDocumentReplication() { + FeatureFlags.initializeFeatureFlags(Settings.builder().put(READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build()); + Settings templateSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT).build(); + request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test"); + final Settings.Builder requestSettings = Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 2); + request.settings(requestSettings.build()); + + IllegalArgumentException exception = expectThrows( + IllegalArgumentException.class, + () -> aggregateIndexSettings( + ClusterState.EMPTY_STATE, + request, + templateSettings, + null, + Settings.EMPTY, + IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, + randomShardLimitService(), + Collections.emptySet(), + clusterSettings + ) + ); + assertEquals("To set index.number_of_search_only_replicas, index.replication.type must be set to SEGMENT", exception.getMessage()); + } + private IndexTemplateMetadata addMatchingTemplate(Consumer configurator) { IndexTemplateMetadata.Builder builder = templateMetadataBuilder("template1", "te*"); configurator.accept(builder); diff --git a/server/src/test/java/org/opensearch/cluster/metadata/SearchOnlyReplicaTests.java b/server/src/test/java/org/opensearch/cluster/metadata/SearchOnlyReplicaTests.java new file mode 100644 index 0000000000000..b1dd397c97218 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/metadata/SearchOnlyReplicaTests.java @@ -0,0 +1,214 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.metadata; + +import org.opensearch.Version; +import org.opensearch.action.admin.cluster.reroute.ClusterRerouteRequest; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.opensearch.action.support.ActiveShardCount; +import org.opensearch.action.support.replication.ClusterStateCreationUtils; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.indices.ShardLimitValidator; +import org.opensearch.indices.cluster.ClusterStateChanges; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_REPLICATION_TYPE_SETTING; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; +import static org.opensearch.common.util.FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL; + +public class SearchOnlyReplicaTests extends OpenSearchTestCase { + + public void testUpdateSearchReplicaCount() { + FeatureFlags.initializeFeatureFlags(Settings.builder().put(READER_WRITER_SPLIT_EXPERIMENTAL, "true").build()); + final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + final ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool); + + try { + List allNodes = new ArrayList<>(); + // node for primary/local + DiscoveryNode localNode = createNode(Version.CURRENT, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE); + allNodes.add(localNode); + // node for search replicas - we'll start with 1 and add another + for (int i = 0; i < 2; i++) { + allNodes.add(createNode(Version.CURRENT, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE)); + } + ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[0])); + + CreateIndexRequest request = new CreateIndexRequest( + "index", + Settings.builder() + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 0) + .put(INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT) + .put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) + .build() + ).waitForActiveShards(ActiveShardCount.NONE); + state = cluster.createIndex(state, request); + assertTrue(state.metadata().hasIndex("index")); + rerouteUntilActive(state, cluster); + IndexShardRoutingTable indexShardRoutingTable = state.getRoutingTable().index("index").getShards().get(0); + assertEquals(1, indexShardRoutingTable.replicaShards().size()); + assertEquals(1, indexShardRoutingTable.searchOnlyReplicas().size()); + assertEquals(0, indexShardRoutingTable.writerReplicas().size()); + + // add another replica + state = cluster.updateSettings( + state, + new UpdateSettingsRequest("index").settings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 2).build()) + ); + rerouteUntilActive(state, cluster); + indexShardRoutingTable = state.getRoutingTable().index("index").getShards().get(0); + assertEquals(2, indexShardRoutingTable.replicaShards().size()); + assertEquals(2, indexShardRoutingTable.searchOnlyReplicas().size()); + assertEquals(0, indexShardRoutingTable.writerReplicas().size()); + + // remove all replicas + state = cluster.updateSettings( + state, + new UpdateSettingsRequest("index").settings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0).build()) + ); + rerouteUntilActive(state, cluster); + indexShardRoutingTable = state.getRoutingTable().index("index").getShards().get(0); + assertEquals(0, indexShardRoutingTable.replicaShards().size()); + assertEquals(0, indexShardRoutingTable.searchOnlyReplicas().size()); + assertEquals(0, indexShardRoutingTable.writerReplicas().size()); + } finally { + terminate(threadPool); + } + } + + public void testUpdateSearchReplicasOverShardLimit() { + FeatureFlags.initializeFeatureFlags(Settings.builder().put(READER_WRITER_SPLIT_EXPERIMENTAL, "true").build()); + final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + final ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool); + + try { + List allNodes = new ArrayList<>(); + // node for primary/local + DiscoveryNode localNode = createNode(Version.CURRENT, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE); + allNodes.add(localNode); + + allNodes.add(createNode(Version.CURRENT, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE)); + + ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[0])); + + CreateIndexRequest request = new CreateIndexRequest( + "index", + Settings.builder() + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 0) + .put(INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT) + .put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) + .build() + ).waitForActiveShards(ActiveShardCount.NONE); + state = cluster.createIndex(state, request); + assertTrue(state.metadata().hasIndex("index")); + rerouteUntilActive(state, cluster); + + // add another replica + ClusterState finalState = state; + Integer maxShardPerNode = ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getDefault(Settings.EMPTY); + expectThrows( + RuntimeException.class, + () -> cluster.updateSettings( + finalState, + new UpdateSettingsRequest("index").settings( + Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, maxShardPerNode * 2).build() + ) + ) + ); + + } finally { + terminate(threadPool); + } + } + + public void testUpdateSearchReplicasOnDocrepCluster() { + FeatureFlags.initializeFeatureFlags(Settings.builder().put(READER_WRITER_SPLIT_EXPERIMENTAL, "true").build()); + final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + final ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool); + + try { + List allNodes = new ArrayList<>(); + // node for primary/local + DiscoveryNode localNode = createNode(Version.CURRENT, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE); + allNodes.add(localNode); + + allNodes.add(createNode(Version.CURRENT, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE)); + + ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[0])); + + CreateIndexRequest request = new CreateIndexRequest( + "index", + Settings.builder() + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 0) + .put(INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.DOCUMENT) + .build() + ).waitForActiveShards(ActiveShardCount.NONE); + state = cluster.createIndex(state, request); + assertTrue(state.metadata().hasIndex("index")); + rerouteUntilActive(state, cluster); + + // add another replica + ClusterState finalState = state; + Integer maxShardPerNode = ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getDefault(Settings.EMPTY); + expectThrows( + RuntimeException.class, + () -> cluster.updateSettings( + finalState, + new UpdateSettingsRequest("index").settings( + Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, maxShardPerNode * 2).build() + ) + ) + ); + } finally { + terminate(threadPool); + } + } + + private static void rerouteUntilActive(ClusterState state, ClusterStateChanges cluster) { + while (state.routingTable().index("index").shard(0).allShardsStarted() == false) { + state = cluster.applyStartedShards( + state, + state.routingTable().index("index").shard(0).shardsWithState(ShardRoutingState.INITIALIZING) + ); + state = cluster.reroute(state, new ClusterRerouteRequest()); + } + } + + private static final AtomicInteger nodeIdGenerator = new AtomicInteger(); + + protected DiscoveryNode createNode(Version version, DiscoveryNodeRole... mustHaveRoles) { + Set roles = new HashSet<>(randomSubsetOf(DiscoveryNodeRole.BUILT_IN_ROLES)); + Collections.addAll(roles, mustHaveRoles); + final String id = String.format(Locale.ROOT, "node_%03d", nodeIdGenerator.incrementAndGet()); + return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), roles, version); + } +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/UpdateNumberOfReplicasTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/UpdateNumberOfReplicasTests.java index b53e520581321..5082d6ab0a37c 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/UpdateNumberOfReplicasTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/UpdateNumberOfReplicasTests.java @@ -41,7 +41,9 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.settings.Settings; import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; @@ -188,4 +190,248 @@ public void testUpdateNumberOfReplicas() { newState = strategy.reroute(clusterState, "reroute"); assertThat(newState, equalTo(clusterState)); } + + public void testUpdateNumberOfReplicasDoesNotImpactSearchReplicas() { + AllocationService strategy = createAllocationService( + Settings.builder().put("cluster.routing.allocation.node_concurrent_recoveries", 10).build() + ); + + logger.info("Building initial routing table"); + + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder("test") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(1) + .numberOfSearchReplicas(1) + ) + .build(); + + RoutingTable routingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .build(); + + assertEquals(1, routingTable.index("test").shards().size()); + IndexShardRoutingTable shardRoutingTable = routingTable.index("test").shard(0); + // 1 primary, 1 replica, 1 search replica + assertEquals(3, shardRoutingTable.size()); + assertEquals(2, shardRoutingTable.replicaShards().size()); + assertEquals(1, shardRoutingTable.searchOnlyReplicas().size()); + assertEquals(UNASSIGNED, shardRoutingTable.shards().get(0).state()); + assertEquals(UNASSIGNED, shardRoutingTable.shards().get(1).state()); + assertEquals(UNASSIGNED, shardRoutingTable.shards().get(2).state()); + assertNull(shardRoutingTable.shards().get(0).currentNodeId()); + assertNull(shardRoutingTable.shards().get(1).currentNodeId()); + assertNull(shardRoutingTable.shards().get(2).currentNodeId()); + + logger.info("Adding two nodes and performing rerouting"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2")).add(newNode("node3"))) + .build(); + + clusterState = strategy.reroute(clusterState, "reroute"); + + logger.info("Start all the primary shards"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("Start all the replica and search shards"); + ClusterState newState = startInitializingShardsAndReroute(strategy, clusterState); + assertNotEquals(newState, clusterState); + clusterState = newState; + + shardRoutingTable = clusterState.routingTable().index("test").shard(0); + final String nodeHoldingPrimary = shardRoutingTable.primaryShard().currentNodeId(); + final String nodeHoldingSearchReplica = shardRoutingTable.searchOnlyReplicas().get(0).currentNodeId(); + final String nodeHoldingReplica = shardRoutingTable.writerReplicas().get(0).currentNodeId(); + + assertNotEquals(nodeHoldingPrimary, nodeHoldingReplica); + assertNotEquals(nodeHoldingPrimary, nodeHoldingSearchReplica); + assertNotEquals(nodeHoldingReplica, nodeHoldingSearchReplica); + + assertEquals( + "There is a single routing shard routing table in the cluster", + clusterState.routingTable().index("test").shards().size(), + 1 + ); + assertEquals("There are three shards as part of the shard routing table", 3, shardRoutingTable.size()); + assertEquals("There are two replicas one search and one write", 2, shardRoutingTable.replicaShards().size()); + assertEquals(1, shardRoutingTable.searchOnlyReplicas().size()); + assertEquals(STARTED, shardRoutingTable.shards().get(0).state()); + assertEquals(STARTED, shardRoutingTable.shards().get(1).state()); + assertEquals(STARTED, shardRoutingTable.shards().get(2).state()); + + logger.info("add another replica"); + final String[] indices = { "test" }; + routingTable = RoutingTable.builder(clusterState.routingTable()).updateNumberOfReplicas(2, indices).build(); + metadata = Metadata.builder(clusterState.metadata()).updateNumberOfReplicas(2, indices).build(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).metadata(metadata).build(); + IndexMetadata indexMetadata = clusterState.metadata().index("test"); + assertEquals(2, indexMetadata.getNumberOfReplicas()); + assertEquals(1, indexMetadata.getNumberOfSearchOnlyReplicas()); + shardRoutingTable = clusterState.routingTable().index("test").shard(0); + assertEquals(4, shardRoutingTable.size()); + assertEquals(3, shardRoutingTable.replicaShards().size()); + assertEquals(2, shardRoutingTable.writerReplicas().size()); + assertEquals(1, shardRoutingTable.searchOnlyReplicas().size()); + assertEquals(shardRoutingTable.primaryShard().state(), STARTED); + assertEquals(shardRoutingTable.searchOnlyReplicas().get(0).state(), STARTED); + + ShardRouting existingReplica = shardRoutingTable.writerReplicas().get(0); + assertEquals(existingReplica.state(), STARTED); + assertEquals(existingReplica.currentNodeId(), nodeHoldingReplica); + ShardRouting newReplica = shardRoutingTable.writerReplicas().get(0); + assertEquals(newReplica.state(), STARTED); + + logger.info("Add another node and start the added replica"); + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node4"))).build(); + newState = strategy.reroute(clusterState, "reroute"); + newState = startInitializingShardsAndReroute(strategy, newState); + assertNotEquals(newState, clusterState); + clusterState = newState; + shardRoutingTable = clusterState.routingTable().index("test").shard(0); + for (ShardRouting replicaShard : shardRoutingTable.replicaShards()) { + assertEquals(replicaShard.state(), STARTED); + } + assertTrue(shardRoutingTable.replicaShards().stream().allMatch(r -> r.state().equals(STARTED))); + + // remove both replicas and assert search replica is unchanged + routingTable = RoutingTable.builder(clusterState.routingTable()).updateNumberOfReplicas(0, indices).build(); + metadata = Metadata.builder(clusterState.metadata()).updateNumberOfReplicas(0, indices).build(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).metadata(metadata).build(); + indexMetadata = clusterState.metadata().index("test"); + assertEquals(0, indexMetadata.getNumberOfReplicas()); + assertEquals(1, indexMetadata.getNumberOfSearchOnlyReplicas()); + shardRoutingTable = clusterState.routingTable().index("test").shard(0); + assertEquals(2, shardRoutingTable.size()); + assertEquals(1, shardRoutingTable.replicaShards().size()); + assertEquals(0, shardRoutingTable.writerReplicas().size()); + assertEquals(1, shardRoutingTable.searchOnlyReplicas().size()); + assertEquals(shardRoutingTable.primaryShard().state(), STARTED); + assertEquals(shardRoutingTable.searchOnlyReplicas().get(0).state(), STARTED); + assertEquals(shardRoutingTable.searchOnlyReplicas().get(0).currentNodeId(), nodeHoldingSearchReplica); + } + + public void testUpdateSearchReplicasDoesNotImpactRegularReplicas() { + AllocationService strategy = createAllocationService( + Settings.builder().put("cluster.routing.allocation.node_concurrent_recoveries", 10).build() + ); + + logger.info("Building initial routing table"); + + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder("test") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(1) + .numberOfSearchReplicas(1) + ) + .build(); + + RoutingTable routingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .build(); + + assertEquals(1, routingTable.index("test").shards().size()); + IndexShardRoutingTable shardRoutingTable = routingTable.index("test").shard(0); + // 1 primary, 1 replica, 1 search replica + assertEquals(3, shardRoutingTable.size()); + assertEquals(2, shardRoutingTable.replicaShards().size()); + assertEquals(1, shardRoutingTable.searchOnlyReplicas().size()); + assertEquals(UNASSIGNED, shardRoutingTable.shards().get(0).state()); + assertEquals(UNASSIGNED, shardRoutingTable.shards().get(1).state()); + assertEquals(UNASSIGNED, shardRoutingTable.shards().get(2).state()); + assertNull(shardRoutingTable.shards().get(0).currentNodeId()); + assertNull(shardRoutingTable.shards().get(1).currentNodeId()); + assertNull(shardRoutingTable.shards().get(2).currentNodeId()); + + logger.info("Adding three nodes and performing rerouting"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2")).add(newNode("node3"))) + .build(); + + clusterState = strategy.reroute(clusterState, "reroute"); + + logger.info("Start all the primary shards"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("Start all the replica and search shards"); + ClusterState newState = startInitializingShardsAndReroute(strategy, clusterState); + assertNotEquals(newState, clusterState); + clusterState = newState; + + shardRoutingTable = clusterState.routingTable().index("test").shard(0); + final String nodeHoldingPrimary = shardRoutingTable.primaryShard().currentNodeId(); + final String nodeHoldingSearchReplica = shardRoutingTable.searchOnlyReplicas().get(0).currentNodeId(); + final String nodeHoldingReplica = shardRoutingTable.writerReplicas().get(0).currentNodeId(); + + assertNotEquals(nodeHoldingPrimary, nodeHoldingReplica); + assertNotEquals(nodeHoldingPrimary, nodeHoldingSearchReplica); + assertNotEquals(nodeHoldingReplica, nodeHoldingSearchReplica); + + assertEquals( + "There is a single routing shard routing table in the cluster", + clusterState.routingTable().index("test").shards().size(), + 1 + ); + assertEquals("There are three shards as part of the shard routing table", 3, shardRoutingTable.size()); + assertEquals("There are two replicas one search and one write", 2, shardRoutingTable.replicaShards().size()); + assertEquals(1, shardRoutingTable.searchOnlyReplicas().size()); + assertEquals(STARTED, shardRoutingTable.shards().get(0).state()); + assertEquals(STARTED, shardRoutingTable.shards().get(1).state()); + assertEquals(STARTED, shardRoutingTable.shards().get(2).state()); + + logger.info("add another replica"); + final String[] indices = { "test" }; + routingTable = RoutingTable.builder(clusterState.routingTable()).updateNumberOfSearchReplicas(2, indices).build(); + metadata = Metadata.builder(clusterState.metadata()).updateNumberOfSearchReplicas(2, indices).build(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).metadata(metadata).build(); + IndexMetadata indexMetadata = clusterState.metadata().index("test"); + assertEquals(1, indexMetadata.getNumberOfReplicas()); + assertEquals(2, indexMetadata.getNumberOfSearchOnlyReplicas()); + shardRoutingTable = clusterState.routingTable().index("test").shard(0); + assertEquals(4, shardRoutingTable.size()); + assertEquals(3, shardRoutingTable.replicaShards().size()); + assertEquals(1, shardRoutingTable.writerReplicas().size()); + assertEquals(2, shardRoutingTable.searchOnlyReplicas().size()); + assertEquals(shardRoutingTable.primaryShard().state(), STARTED); + assertEquals(shardRoutingTable.writerReplicas().get(0).state(), STARTED); + assertEquals(shardRoutingTable.searchOnlyReplicas().get(0).state(), STARTED); + assertEquals(shardRoutingTable.searchOnlyReplicas().get(1).state(), UNASSIGNED); + + logger.info("Add another node and start the added replica"); + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node4"))).build(); + newState = strategy.reroute(clusterState, "reroute"); + newState = startInitializingShardsAndReroute(strategy, newState); + assertNotEquals(newState, clusterState); + clusterState = newState; + shardRoutingTable = clusterState.routingTable().index("test").shard(0); + for (ShardRouting replicaShard : shardRoutingTable.replicaShards()) { + assertEquals(replicaShard.state(), STARTED); + } + assertTrue(shardRoutingTable.replicaShards().stream().allMatch(r -> r.state().equals(STARTED))); + + // remove both replicas and assert search replica is unchanged + routingTable = RoutingTable.builder(clusterState.routingTable()).updateNumberOfSearchReplicas(0, indices).build(); + metadata = Metadata.builder(clusterState.metadata()).updateNumberOfSearchReplicas(0, indices).build(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).metadata(metadata).build(); + indexMetadata = clusterState.metadata().index("test"); + assertEquals(1, indexMetadata.getNumberOfReplicas()); + assertEquals(0, indexMetadata.getNumberOfSearchOnlyReplicas()); + shardRoutingTable = clusterState.routingTable().index("test").shard(0); + assertEquals(2, shardRoutingTable.size()); + assertEquals(1, shardRoutingTable.replicaShards().size()); + assertEquals(1, shardRoutingTable.writerReplicas().size()); + assertEquals(0, shardRoutingTable.searchOnlyReplicas().size()); + assertEquals(shardRoutingTable.primaryShard().state(), STARTED); + assertEquals(shardRoutingTable.replicaShards().get(0).state(), STARTED); + assertEquals(shardRoutingTable.replicaShards().get(0).currentNodeId(), nodeHoldingReplica); + } } diff --git a/server/src/test/java/org/opensearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/opensearch/indices/cluster/ClusterStateChanges.java index b6ca632d4b49a..bea1dcf0cffce 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/opensearch/indices/cluster/ClusterStateChanges.java @@ -94,8 +94,8 @@ import org.opensearch.common.SetOnce; import org.opensearch.common.UUIDs; import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.settings.IndexScopedSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.SettingsModule; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.action.ActionResponse; import org.opensearch.core.index.Index; @@ -291,10 +291,13 @@ public IndexMetadata upgradeIndexMetadata(IndexMetadata indexMetadata, Version m final AwarenessReplicaBalance awarenessReplicaBalance = new AwarenessReplicaBalance(SETTINGS, clusterService.getClusterSettings()); + // build IndexScopedSettings from a settingsModule so that all settings gated by enabled featureFlags are registered. + SettingsModule settingsModule = new SettingsModule(Settings.EMPTY); + MetadataUpdateSettingsService metadataUpdateSettingsService = new MetadataUpdateSettingsService( clusterService, allocationService, - IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, + settingsModule.getIndexScopedSettings(), indicesService, shardLimitValidator, threadPool, @@ -308,7 +311,7 @@ public IndexMetadata upgradeIndexMetadata(IndexMetadata indexMetadata, Version m new AliasValidator(), shardLimitValidator, environment, - IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, + settingsModule.getIndexScopedSettings(), threadPool, xContentRegistry, systemIndices, diff --git a/test/framework/src/main/java/org/opensearch/cluster/routing/TestShardRouting.java b/test/framework/src/main/java/org/opensearch/cluster/routing/TestShardRouting.java index c7c71f0f569e5..9a000a4eeda72 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/routing/TestShardRouting.java +++ b/test/framework/src/main/java/org/opensearch/cluster/routing/TestShardRouting.java @@ -319,4 +319,27 @@ public static RecoverySource randomRecoverySource() { ) ); } + + public static ShardRouting newShardRouting( + ShardId shardId, + String currentNodeId, + String relocatingNodeId, + boolean primary, + boolean searchOnly, + ShardRoutingState state, + UnassignedInfo unassignedInfo + ) { + return new ShardRouting( + shardId, + currentNodeId, + relocatingNodeId, + primary, + searchOnly, + state, + buildRecoveryTarget(primary, state), + unassignedInfo, + buildAllocationId(state), + -1 + ); + } }