From 7511677c0fa6bdea59bef1d434ca63e94ef8b41e Mon Sep 17 00:00:00 2001 From: Lakshya Taragi Date: Mon, 18 Mar 2024 16:13:09 +0530 Subject: [PATCH] Common changes Signed-off-by: Lakshya Taragi --- .../RemoteStoreMigrationAllocationIT.java | 536 ++++++++++++++++++ .../RemoteStoreMigrationSettingsUpdateIT.java | 137 +++++ .../TransportClusterUpdateSettingsAction.java | 53 ++ .../metadata/MetadataCreateIndexService.java | 62 +- .../remotestore/RemoteStoreNodeService.java | 15 + ...ransportClusterManagerNodeActionTests.java | 163 ++++++ .../coordination/JoinTaskExecutorTests.java | 10 +- .../MetadataCreateIndexServiceTests.java | 161 +++++- ...eStoreMigrationAllocationDeciderTests.java | 17 +- 9 files changed, 1105 insertions(+), 49 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationAllocationIT.java create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationSettingsUpdateIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationAllocationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationAllocationIT.java new file mode 100644 index 0000000000000..42d1458b0dca8 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationAllocationIT.java @@ -0,0 +1,536 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotemigration; + +import org.opensearch.action.admin.cluster.allocation.ClusterAllocationExplanation; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.support.ActiveShardCount; +import org.opensearch.client.Client; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; +import org.opensearch.cluster.routing.allocation.MoveDecision; +import org.opensearch.cluster.routing.allocation.NodeAllocationResult; +import org.opensearch.cluster.routing.allocation.decider.Decision; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; + +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode.MIXED; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.Direction.REMOTE_STORE; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false) +public class RemoteStoreMigrationAllocationIT extends MigrationBaseTestCase { + + public static final String TEST_INDEX = "test_index"; + public static final String NAME = "remote_store_migration"; + + private static final ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + private Client client; + + // tests for primary shard copy allocation with MIXED mode and REMOTE_STORE direction + + public void testAllocateNewPrimaryShardForMixedModeAndRemoteStoreDirection() throws Exception { + logger.info(" --> initialize cluster"); + initializeCluster(false); + + logger.info(" --> add remote and non-remote nodes"); + setClusterMode(MIXED.mode); + addRemote = false; + String nonRemoteNodeName = internalCluster().startNode(); + internalCluster().validateClusterFormed(); + DiscoveryNode nonRemoteNode = assertNodeInCluster(nonRemoteNodeName); + + logger.info(" --> verify expected decision for allocating a new primary shard on a non-remote node"); + prepareIndexWithoutReplica(Optional.empty()); + + logger.info(" --> set remote_store direction"); + setDirection(REMOTE_STORE.direction); + + Decision decision = getDecisionForTargetNode(nonRemoteNode, true, true, false); + assertEquals(Decision.Type.NO, decision.type()); + assertEquals( + "[remote_store migration_direction]: primary shard copy can not be allocated to a non-remote node", + decision.getExplanation().toLowerCase(Locale.ROOT) + ); + + logger.info(" --> attempt allocation on non-remote node"); + attemptAllocation(Optional.empty()); + + logger.info(" --> verify non-allocation of primary shard on non-remote node"); + assertNonAllocation(true); + + addRemote = true; + String remoteNodeName = internalCluster().startNode(); + internalCluster().validateClusterFormed(); + DiscoveryNode remoteNode = assertNodeInCluster(remoteNodeName); + + logger.info(" --> verify expected decision for allocating a new primary shard on a remote node"); + prepareDecisions(); + decision = getDecisionForTargetNode(remoteNode, true, true, false); + assertEquals(Decision.Type.YES, decision.type()); + assertEquals( + "[remote_store migration_direction]: primary shard copy can be allocated to a remote node", + decision.getExplanation().toLowerCase(Locale.ROOT) + ); + + logger.info(" --> attempt allocation on remote node"); + attemptAllocation(Optional.empty()); + ensureGreen(TEST_INDEX); + + logger.info(" --> verify allocation of primary shard"); + assertAllocation(true, Optional.of(remoteNode)); + } + + // tests for replica shard copy allocation with MIXED mode and REMOTE_STORE direction + + public void testNewReplicaShardAllocationIfPrimaryShardOnNonRemoteNodeForMixedModeAndRemoteStoreDirection() throws Exception { + logger.info(" --> initialize cluster"); + initializeCluster(false); + + logger.info(" --> add remote and non-remote nodes"); + setClusterMode(MIXED.mode); + String nonRemoteNodeName1 = internalCluster().startNode(); + addRemote = true; + String remoteNodeName = internalCluster().startNode(); + internalCluster().validateClusterFormed(); + DiscoveryNode nonRemoteNode1 = assertNodeInCluster(nonRemoteNodeName1); + DiscoveryNode remoteNode = assertNodeInCluster(remoteNodeName); + + logger.info(" --> allocate primary shard on non-remote node"); + prepareIndexWithAllocatedPrimary(nonRemoteNode1, Optional.empty()); + + logger.info(" --> set remote_store direction"); + setDirection(REMOTE_STORE.direction); + + logger.info(" --> verify expected decision for replica shard for remote node"); + prepareDecisions(); + Decision decision = getDecisionForTargetNode(remoteNode, false, true, false); + assertEquals(Decision.Type.NO, decision.type()); + assertEquals( + "[remote_store migration_direction]: replica shard copy can not be allocated to a remote node since primary shard copy is not yet migrated to remote", + decision.getExplanation().toLowerCase(Locale.ROOT) + ); + + logger.info(" --> attempt allocation of replica shard on remote node"); + attemptAllocation(Optional.empty()); + + logger.info(" --> verify non-allocation of replica shard"); + assertNonAllocation(false); + + logger.info(" --> add another non-remote node"); + addRemote = false; + String nonRemoteNodeName2 = internalCluster().startNode(); + internalCluster().validateClusterFormed(); + DiscoveryNode nonRemoteNode2 = assertNodeInCluster(nonRemoteNodeName2); + + logger.info(" --> verify expected decision for replica shard for the other non-remote node"); + prepareDecisions(); + decision = getDecisionForTargetNode(nonRemoteNode2, false, true, false); + assertEquals(Decision.Type.YES, decision.type()); + assertEquals( + "[remote_store migration_direction]: replica shard copy can be allocated to a non-remote node", + decision.getExplanation().toLowerCase(Locale.ROOT) + ); + + logger.info(" --> attempt allocation of replica shard on the other non-remote node"); + attemptAllocation(Optional.empty()); + ensureGreen(TEST_INDEX); + + logger.info(" --> verify allocation of replica shard"); + assertAllocation(false, Optional.of(nonRemoteNode2)); + } + + public void testNewReplicaShardAllocationIfPrimaryShardOnRemoteNodeForMixedModeAndRemoteStoreDirection() throws Exception { + logger.info(" --> initialize cluster"); + initializeCluster(false); + + logger.info(" --> set mixed cluster compatibility mode"); + setClusterMode(MIXED.mode); + + logger.info(" --> add remote and non-remote nodes"); + addRemote = true; + String remoteNodeName1 = internalCluster().startNode(); + String remoteNodeName2 = internalCluster().startNode(); + addRemote = false; + String nonRemoteNodeName = internalCluster().startNode(); + internalCluster().validateClusterFormed(); + DiscoveryNode remoteNode1 = assertNodeInCluster(remoteNodeName1); + DiscoveryNode remoteNode2 = assertNodeInCluster(remoteNodeName2); + DiscoveryNode nonRemoteNode = assertNodeInCluster(nonRemoteNodeName); + + logger.info(" --> allocate primary shard on remote node"); + prepareIndexWithAllocatedPrimary(remoteNode1, Optional.empty()); + + logger.info(" --> set remote_store direction"); + setDirection(REMOTE_STORE.direction); + + logger.info(" --> verify expected decision for replica shard"); + prepareDecisions(); + Decision decision = getDecisionForTargetNode(remoteNode2, false, true, false); + assertEquals(Decision.Type.YES, decision.type()); + assertEquals( + "[remote_store migration_direction]: replica shard copy can be allocated to a remote node since primary shard copy has been migrated to remote", + decision.getExplanation().toLowerCase(Locale.ROOT) + ); + + decision = getDecisionForTargetNode(nonRemoteNode, false, true, false); + assertEquals(Decision.Type.YES, decision.type()); + assertEquals( + "[remote_store migration_direction]: replica shard copy can be allocated to a non-remote node", + decision.getExplanation().toLowerCase(Locale.ROOT) + ); + } + + // test for STRICT mode + + public void testAlwaysAllocateNewShardForStrictMode() throws Exception { + boolean isRemoteCluster = randomBoolean(); + boolean isReplicaAllocation = randomBoolean(); + + logger.info(" --> initialize cluster and add nodes"); + initializeCluster(isRemoteCluster); + String nodeName1 = internalCluster().startNode(); + String nodeName2 = internalCluster().startNode(); + internalCluster().validateClusterFormed(); + DiscoveryNode node1 = assertNodeInCluster(nodeName1); + DiscoveryNode node2 = assertNodeInCluster(nodeName2); + + logger.info(" --> verify expected decision for allocating a new shard on a non-remote node"); + if (isReplicaAllocation) { + prepareIndexWithAllocatedPrimary(node1, Optional.empty()); + } else { + prepareIndexWithoutReplica(Optional.empty()); + } + + logger.info(" --> set remote_store direction"); + setDirection(REMOTE_STORE.direction); + + assertEquals( + (isRemoteCluster ? "true" : null), + client.admin() + .cluster() + .prepareState() + .execute() + .actionGet() + .getState() + .getMetadata() + .index(TEST_INDEX) + .getSettings() + .get(SETTING_REMOTE_STORE_ENABLED) + ); + + prepareDecisions(); + + Decision decision = getDecisionForTargetNode( + isReplicaAllocation ? node2 : randomFrom(node1, node2), + !isReplicaAllocation, + true, + false + ); + assertEquals(Decision.Type.YES, decision.type()); + String expectedReason = String.format( + Locale.ROOT, + "[remote_store migration_direction]: %s shard copy can be allocated to a %s node for strict compatibility mode", + (isReplicaAllocation ? "replica" : "primary"), + (isRemoteCluster ? "remote" : "non-remote") + ); + assertEquals(expectedReason, decision.getExplanation().toLowerCase(Locale.ROOT)); + + logger.info(" --> attempt allocation"); + attemptAllocation(Optional.empty()); + ensureGreen(TEST_INDEX); + + logger.info(" --> verify allocation of primary shard"); + assertAllocation(!isReplicaAllocation, !isReplicaAllocation ? Optional.empty() : Optional.of(node2)); + } + + // test for remote store backed index + + public void testDontAllocateToNonRemoteNodeForRemoteStoreBackedIndex() throws Exception { + logger.info(" --> initialize cluster with remote master node"); + initializeCluster(true); + + logger.info(" --> add remote and non-remote nodes"); + String remoteNodeName = internalCluster().startNode(); + setClusterMode(MIXED.mode); + addRemote = false; + String nonRemoteNodeName = internalCluster().startNode(); + internalCluster().validateClusterFormed(); + DiscoveryNode remoteNode = assertNodeInCluster(remoteNodeName); + DiscoveryNode nonRemoteNode = assertNodeInCluster(nonRemoteNodeName); + + boolean isReplicaAllocation = randomBoolean(); + + logger.info(" --> verify expected decision for allocating a new shard on a non-remote node"); + if (isReplicaAllocation) { + prepareIndexWithAllocatedPrimary(remoteNode, Optional.empty()); + } else { + prepareIndexWithoutReplica(Optional.empty()); + } + + assertEquals( + "true", + client.admin() + .cluster() + .prepareState() + .execute() + .actionGet() + .getState() + .getMetadata() + .index(TEST_INDEX) + .getSettings() + .get(SETTING_REMOTE_STORE_ENABLED) + ); + + setDirection(REMOTE_STORE.direction); + prepareDecisions(); + Decision decision = getDecisionForTargetNode(nonRemoteNode, !isReplicaAllocation, false, false); + assertEquals(Decision.Type.NO, decision.type()); + String expectedReason = String.format( + Locale.ROOT, + "[remote_store migration_direction]: %s shard copy can not be allocated to a non-remote node because a remote store backed index's shard copy can only be allocated to a remote node", + (isReplicaAllocation ? "replica" : "primary") + ); + assertEquals(expectedReason, decision.getExplanation().toLowerCase(Locale.ROOT)); + + logger.info(" --> attempt allocation of shard on non-remote node"); + attemptAllocation(Optional.of(nonRemoteNodeName)); + + logger.info(" --> verify non-allocation of shard"); + assertNonAllocation(!isReplicaAllocation); + } + + // bootstrap a cluster + public void initializeCluster(boolean remoteClusterManager) { + addRemote = remoteClusterManager; + internalCluster().setBootstrapClusterManagerNodeIndex(0); + internalCluster().startNodes(1); + client = internalCluster().client(); + } + + // assign settings to be updated randomly as persistent or transient + private static void randomlyAssignPersistentOrTransient(Settings.Builder settingsBuilder) { + updateSettingsRequest.persistentSettings(Settings.EMPTY); + updateSettingsRequest.transientSettings(Settings.EMPTY); + boolean isPersistentSetting = randomBoolean(); + if (isPersistentSetting) { + updateSettingsRequest.persistentSettings(settingsBuilder); + } else { + updateSettingsRequest.transientSettings(settingsBuilder); + } + } + + // set the compatibility mode of cluster [strict, mixed] + public static void setClusterMode(String mode) { + randomlyAssignPersistentOrTransient(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), mode)); + assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + } + + // set the migration direction for cluster [remote_store, docrep, none] + public static void setDirection(String direction) { + randomlyAssignPersistentOrTransient(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), direction)); + assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + } + + // verify that the given nodeName exists in cluster + public static DiscoveryNode assertNodeInCluster(String nodeName) { + Map nodes = internalCluster().client().admin().cluster().prepareState().get().getState().nodes().getNodes(); + DiscoveryNode discoveryNode = null; + for (Map.Entry entry : nodes.entrySet()) { + DiscoveryNode node = entry.getValue(); + if (node.getName().equals(nodeName)) { + discoveryNode = node; + break; + } + } + assertNotNull(discoveryNode); + return discoveryNode; + } + + // returns a comma-separated list of node names excluding `except` + public static String allNodesExcept(String except) { + StringBuilder exclude = new StringBuilder(); + DiscoveryNodes allNodes = internalCluster().client().admin().cluster().prepareState().get().getState().nodes(); + for (DiscoveryNode node : allNodes) { + if (node.getName().equals(except) == false) { + exclude.append(node.getName()).append(","); + } + } + return exclude.toString(); + } + + // obtain decision for allocation/relocation of a shard to a given node + private Decision getDecisionForTargetNode( + DiscoveryNode targetNode, + boolean isPrimary, + boolean includeYesDecisions, + boolean isRelocation + ) { + ClusterAllocationExplanation explanation = client.admin() + .cluster() + .prepareAllocationExplain() + .setIndex(TEST_INDEX) + .setShard(0) + .setPrimary(isPrimary) + .setIncludeYesDecisions(includeYesDecisions) + .get() + .getExplanation(); + + Decision requiredDecision = null; + List nodeAllocationResults; + if (isRelocation) { + MoveDecision moveDecision = explanation.getShardAllocationDecision().getMoveDecision(); + nodeAllocationResults = moveDecision.getNodeDecisions(); + } else { + AllocateUnassignedDecision allocateUnassignedDecision = explanation.getShardAllocationDecision().getAllocateDecision(); + nodeAllocationResults = allocateUnassignedDecision.getNodeDecisions(); + } + + for (NodeAllocationResult nodeAllocationResult : nodeAllocationResults) { + if (nodeAllocationResult.getNode().equals(targetNode)) { + for (Decision decision : nodeAllocationResult.getCanAllocateDecision().getDecisions()) { + if (decision.label().equals(NAME)) { + requiredDecision = decision; + break; + } + } + } + } + + assertNotNull(requiredDecision); + return requiredDecision; + } + + // create a new test index + public static void prepareIndexWithoutReplica(Optional name) { + String indexName = name.orElse(TEST_INDEX); + internalCluster().client() + .admin() + .indices() + .prepareCreate(indexName) + .setSettings( + Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.routing.allocation.exclude._name", allNodesExcept(null)) + ) + .execute() + .actionGet(); + } + + public void prepareIndexWithAllocatedPrimary(DiscoveryNode primaryShardNode, Optional name) { + String indexName = name.orElse(TEST_INDEX); + client.admin() + .indices() + .prepareCreate(indexName) + .setSettings( + Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + .put("index.routing.allocation.include._name", primaryShardNode.getName()) + .put("index.routing.allocation.exclude._name", allNodesExcept(primaryShardNode.getName())) + ) + .setWaitForActiveShards(ActiveShardCount.ONE) + .execute() + .actionGet(); + + ensureYellowAndNoInitializingShards(TEST_INDEX); + + logger.info(" --> verify allocation of primary shard"); + assertAllocation(true, Optional.of(primaryShardNode)); + + logger.info(" --> verify non-allocation of replica shard"); + assertNonAllocation(false); + } + + // get allocation and relocation decisions for all nodes + private void prepareDecisions() { + client.admin() + .indices() + .prepareUpdateSettings(TEST_INDEX) + .setSettings(Settings.builder().put("index.routing.allocation.exclude._name", allNodesExcept(null))) + .execute() + .actionGet(); + } + + private void attemptAllocation(Optional targetNodeName) { + String nodeName = targetNodeName.orElse(null); + Settings.Builder settingsBuilder; + if (nodeName != null) { + settingsBuilder = Settings.builder() + .put("index.routing.allocation.include._name", nodeName) + .put("index.routing.allocation.exclude._name", allNodesExcept(nodeName)); + } else { + String clusterManagerNodeName = client.admin() + .cluster() + .prepareState() + .execute() + .actionGet() + .getState() + .getNodes() + .getClusterManagerNode() + .getName(); + // to allocate freely among all nodes other than cluster-manager node + settingsBuilder = Settings.builder() + .put("index.routing.allocation.include._name", allNodesExcept(clusterManagerNodeName)) + .put("index.routing.allocation.exclude._name", clusterManagerNodeName); + } + client.admin().indices().prepareUpdateSettings(TEST_INDEX).setSettings(settingsBuilder).execute().actionGet(); + } + + private ShardRouting getShardRouting(boolean isPrimary) { + IndexShardRoutingTable table = client.admin() + .cluster() + .prepareState() + .execute() + .actionGet() + .getState() + .getRoutingTable() + .index(TEST_INDEX) + .shard(0); + return (isPrimary ? table.primaryShard() : table.replicaShards().get(0)); + } + + // verify that shard does not exist at targetNode + private void assertNonAllocation(boolean isPrimary) { + if (isPrimary) { + ensureRed(TEST_INDEX); + } else { + ensureYellowAndNoInitializingShards(TEST_INDEX); + } + ShardRouting shardRouting = getShardRouting(isPrimary); + assertFalse(shardRouting.active()); + assertNull(shardRouting.currentNodeId()); + assertEquals(ShardRoutingState.UNASSIGNED, shardRouting.state()); + } + + // verify that shard exists at targetNode + private void assertAllocation(boolean isPrimary, Optional targetNode) { + ShardRouting shardRouting = getShardRouting(isPrimary); + assertTrue(shardRouting.active()); + assertNotNull(shardRouting.currentNodeId()); + DiscoveryNode node = targetNode.orElse(null); + if (node != null) { + assertEquals(shardRouting.currentNodeId(), node.getId()); + } + } + +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationSettingsUpdateIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationSettingsUpdateIT.java new file mode 100644 index 0000000000000..5d99bb365b0fa --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationSettingsUpdateIT.java @@ -0,0 +1,137 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotemigration; + +import org.opensearch.client.Client; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.SettingsException; +import org.opensearch.index.IndexSettings; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.InternalTestCluster; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.Optional; + +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; +import static org.opensearch.index.IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode.MIXED; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode.STRICT; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.Direction.REMOTE_STORE; +import static org.opensearch.remotemigration.RemoteStoreMigrationAllocationIT.assertNodeInCluster; +import static org.opensearch.remotemigration.RemoteStoreMigrationAllocationIT.prepareIndexWithoutReplica; +import static org.opensearch.remotemigration.RemoteStoreMigrationAllocationIT.setClusterMode; +import static org.opensearch.remotemigration.RemoteStoreMigrationAllocationIT.setDirection; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false) +public class RemoteStoreMigrationSettingsUpdateIT extends MigrationBaseTestCase { + + private Client client; + + // remote store backed index setting tests + + public void testNewIndexIsRemoteStoreBackedForRemoteStoreDirectionAndMixedMode() { + logger.info(" --> initialize cluster: gives non remote cluster manager"); + initializeCluster(false); + + String indexName1 = "test_index_1"; + String indexName2 = "test_index_2"; + + logger.info(" --> add non-remote node"); + addRemote = false; + String nonRemoteNodeName = internalCluster().startNode(); + internalCluster().validateClusterFormed(); + assertNodeInCluster(nonRemoteNodeName); + + logger.info(" --> create an index"); + prepareIndexWithoutReplica(Optional.of(indexName1)); + + logger.info(" --> verify that non remote-backed index is created"); + assertNonRemoteStoreBackedIndex(indexName1); + + logger.info(" --> set mixed cluster compatibility mode and remote_store direction"); + setClusterMode(MIXED.mode); + setDirection(REMOTE_STORE.direction); + + logger.info(" --> add remote node"); + addRemote = true; + String remoteNodeName = internalCluster().startNode(); + internalCluster().validateClusterFormed(); + assertNodeInCluster(remoteNodeName); + + logger.info(" --> create another index"); + prepareIndexWithoutReplica(Optional.of(indexName2)); + + logger.info(" --> verify that remote backed index is created"); + assertRemoteStoreBackedIndex(indexName2); + } + + // compatibility mode setting test + + public void testSwitchToStrictMode() throws Exception { + logger.info(" --> initialize cluster"); + initializeCluster(false); + + logger.info(" --> create a mixed mode cluster"); + setClusterMode(MIXED.mode); + addRemote = true; + String remoteNodeName = internalCluster().startNode(); + addRemote = false; + String nonRemoteNodeName = internalCluster().startNode(); + internalCluster().validateClusterFormed(); + assertNodeInCluster(remoteNodeName); + assertNodeInCluster(nonRemoteNodeName); + + logger.info(" --> attempt switching to strict mode"); + SettingsException exception = assertThrows(SettingsException.class, () -> setClusterMode(STRICT.mode)); + assertEquals( + "can not switch to STRICT compatibility mode when the cluster contains both remote and non-remote nodes", + exception.getMessage() + ); + + logger.info(" --> stop remote node so that cluster had only non-remote nodes"); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(remoteNodeName)); + ensureStableCluster(2); + + logger.info(" --> attempt switching to strict mode"); + setClusterMode(STRICT.mode); + } + + // verify that the created index is not remote store backed + private void assertNonRemoteStoreBackedIndex(String indexName) { + Settings indexSettings = client.admin().indices().prepareGetIndex().execute().actionGet().getSettings().get(indexName); + assertEquals(ReplicationType.DOCUMENT.toString(), indexSettings.get(SETTING_REPLICATION_TYPE)); + assertNull(indexSettings.get(SETTING_REMOTE_STORE_ENABLED)); + assertNull(indexSettings.get(SETTING_REMOTE_SEGMENT_STORE_REPOSITORY)); + assertNull(indexSettings.get(SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY)); + } + + // verify that the created index is remote store backed + private void assertRemoteStoreBackedIndex(String indexName) { + Settings indexSettings = client.admin().indices().prepareGetIndex().execute().actionGet().getSettings().get(indexName); + assertEquals(ReplicationType.SEGMENT.toString(), indexSettings.get(SETTING_REPLICATION_TYPE)); + assertEquals("true", indexSettings.get(SETTING_REMOTE_STORE_ENABLED)); + assertEquals(REPOSITORY_NAME, indexSettings.get(SETTING_REMOTE_SEGMENT_STORE_REPOSITORY)); + assertEquals(REPOSITORY_2_NAME, indexSettings.get(SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY)); + assertEquals( + IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, + INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(indexSettings) + ); + } + + private void initializeCluster(boolean remoteClusterManager) { + addRemote = remoteClusterManager; + internalCluster().setBootstrapClusterManagerNodeIndex(0); + internalCluster().startNodes(1); + client = internalCluster().client(); + } + +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java index 2f3cc77b05550..2d2e5ca696c00 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java @@ -45,6 +45,7 @@ import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.service.ClusterManagerTaskKeys; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; @@ -53,12 +54,18 @@ import org.opensearch.common.Priority; import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.SettingsException; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.node.remotestore.RemoteStoreNodeService; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; import java.io.IOException; +import java.util.Locale; +import java.util.Set; +import java.util.stream.Collectors; /** * Transport action for updating cluster settings @@ -251,6 +258,7 @@ public void onFailure(String source, Exception e) { @Override public ClusterState execute(final ClusterState currentState) { + validateCompatibilityModeSettingRequest(request, state); final ClusterState clusterState = updater.updateSettings( currentState, clusterSettings.upgradeSettings(request.transientSettings()), @@ -264,4 +272,49 @@ public ClusterState execute(final ClusterState currentState) { ); } + /** + * Runs various checks associated with changing cluster compatibility mode + * @param request cluster settings update request, for settings to be updated and new values + * @param clusterState current state of cluster, for information on nodes + */ + public static void validateCompatibilityModeSettingRequest(ClusterUpdateSettingsRequest request, ClusterState clusterState) { + Settings settings = Settings.builder().put(request.persistentSettings()).put(request.transientSettings()).build(); + if (RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.exists(settings)) { + String value = settings.get(RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey()).toLowerCase(Locale.ROOT); + validateAllNodesOfSameVersion(clusterState.nodes()); + if (value.equals(RemoteStoreNodeService.CompatibilityMode.STRICT.mode)) { + validateAllNodesOfSameType(clusterState.nodes()); + } + } + } + + /** + * Verifies that while trying to change the compatibility mode, all nodes must have the same version. + * If not, it throws SettingsException error + * @param discoveryNodes current discovery nodes in the cluster + */ + private static void validateAllNodesOfSameVersion(DiscoveryNodes discoveryNodes) { + if (discoveryNodes.getMaxNodeVersion().equals(discoveryNodes.getMinNodeVersion()) == false) { + throw new SettingsException("can not change the compatibility mode when all the nodes in cluster are not of the same version"); + } + } + + /** + * Verifies that while trying to switch to STRICT compatibility mode, all nodes must be of the + * same type (all remote or all non-remote). If not, it throws SettingsException error + * @param discoveryNodes current discovery nodes in the cluster + */ + private static void validateAllNodesOfSameType(DiscoveryNodes discoveryNodes) { + Set nodeTypes = discoveryNodes.getNodes() + .values() + .stream() + .map(DiscoveryNode::isRemoteStoreNode) + .collect(Collectors.toSet()); + if (nodeTypes.size() != 1) { + throw new SettingsException( + "can not switch to STRICT compatibility mode when the cluster contains both remote and non-remote nodes" + ); + } + } + } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java index 64bea79c9e47b..2884ca9b345ef 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java @@ -52,6 +52,7 @@ import org.opensearch.cluster.block.ClusterBlock; import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.block.ClusterBlocks; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; @@ -100,7 +101,6 @@ import org.opensearch.indices.ShardLimitValidator; import org.opensearch.indices.SystemIndices; import org.opensearch.indices.replication.common.ReplicationType; -import org.opensearch.node.Node; import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import org.opensearch.threadpool.ThreadPool; @@ -144,6 +144,7 @@ import static org.opensearch.index.IndexModule.INDEX_STORE_TYPE_SETTING; import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.isMigratingToRemoteStore; /** * Service responsible for submitting create index requests @@ -945,7 +946,7 @@ static Settings aggregateIndexSettings( indexSettingsBuilder.put(SETTING_INDEX_UUID, UUIDs.randomBase64UUID()); updateReplicationStrategy(indexSettingsBuilder, request.settings(), settings, combinedTemplateSettings); - updateRemoteStoreSettings(indexSettingsBuilder, settings); + updateRemoteStoreSettings(indexSettingsBuilder, currentState, clusterSettings, settings, request.index()); if (sourceMetadata != null) { assert request.resizeType() != null; @@ -1023,23 +1024,50 @@ private static void updateReplicationStrategy( /** * Updates index settings to enable remote store by default based on node attributes * @param settingsBuilder index settings builder to be updated with relevant settings + * @param clusterState state of cluster * @param clusterSettings cluster level settings + * @param nodeSettings node level settings + * @param indexName name of index */ - private static void updateRemoteStoreSettings(Settings.Builder settingsBuilder, Settings clusterSettings) { - if (isRemoteDataAttributePresent(clusterSettings)) { - settingsBuilder.put(SETTING_REMOTE_STORE_ENABLED, true) - .put( - SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, - clusterSettings.get( - Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY - ) - ) - .put( - SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, - clusterSettings.get( - Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY - ) - ); + public static void updateRemoteStoreSettings( + Settings.Builder settingsBuilder, + ClusterState clusterState, + ClusterSettings clusterSettings, + Settings nodeSettings, + String indexName + ) { + if (isRemoteDataAttributePresent(nodeSettings) || isMigratingToRemoteStore(clusterSettings)) { + String segmentRepo, translogRepo; + + Optional remoteNode = clusterState.nodes() + .getNodes() + .values() + .stream() + .filter(DiscoveryNode::isRemoteStoreNode) + .findFirst(); + + if (remoteNode.isPresent()) { + translogRepo = remoteNode.get() + .getAttributes() + .get(RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY); + segmentRepo = remoteNode.get() + .getAttributes() + .get(RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY); + if (segmentRepo != null && translogRepo != null) { + if (isMigratingToRemoteStore(clusterSettings)) { + settingsBuilder.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT); + } + settingsBuilder.put(SETTING_REMOTE_STORE_ENABLED, true) + .put(SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, segmentRepo) + .put(SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, translogRepo); + } else { + ValidationException validationException = new ValidationException(); + validationException.addValidationErrors( + Collections.singletonList("Cluster is migrating to remote store but no remote node found, failing index creation") + ); + throw new IndexCreationException(indexName, validationException); + } + } } } diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java index 33b182dd3cc97..94b11086ba865 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java @@ -14,6 +14,7 @@ import org.opensearch.cluster.metadata.RepositoriesMetadata; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.util.FeatureFlags; import org.opensearch.repositories.RepositoriesService; @@ -223,4 +224,18 @@ public RepositoriesMetadata updateRepositoriesMetadata(DiscoveryNode joiningNode return existingRepositories; } } + + /** + * To check if the cluster is undergoing remote store migration + * @param clusterSettings cluster level settings + * @return + * true For REMOTE_STORE migration direction and MIXED compatibility mode, + * false otherwise + */ + public static boolean isMigratingToRemoteStore(ClusterSettings clusterSettings) { + boolean isMixedMode = clusterSettings.get(REMOTE_STORE_COMPATIBILITY_MODE_SETTING).equals(CompatibilityMode.MIXED); + boolean isRemoteStoreMigrationDirection = clusterSettings.get(MIGRATION_DIRECTION_SETTING).equals(Direction.REMOTE_STORE); + + return (isMixedMode && isRemoteStoreMigrationDirection); + } } diff --git a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java index 538416e1137f5..814ed1681540b 100644 --- a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java @@ -16,10 +16,13 @@ import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.admin.cluster.settings.TransportClusterUpdateSettingsAction; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.ThreadedActionListener; import org.opensearch.action.support.replication.ClusterStateCreationUtils; +import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.NotClusterManagerException; import org.opensearch.cluster.block.ClusterBlock; @@ -28,14 +31,18 @@ import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.cluster.coordination.FailedToCommitClusterStateException; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterManagerThrottlingException; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.UUIDs; import org.opensearch.common.action.ActionFuture; import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.SettingsException; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.action.ActionListener; import org.opensearch.core.action.ActionResponse; @@ -44,6 +51,7 @@ import org.opensearch.core.rest.RestStatus; import org.opensearch.discovery.ClusterManagerNotDiscoveredException; import org.opensearch.node.NodeClosedException; +import org.opensearch.node.remotestore.RemoteStoreNodeService; import org.opensearch.tasks.Task; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; @@ -68,8 +76,18 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.opensearch.cluster.coordination.JoinTaskExecutorTests.SEGMENT_REPO; +import static org.opensearch.cluster.coordination.JoinTaskExecutorTests.TRANSLOG_REPO; +import static org.opensearch.cluster.coordination.JoinTaskExecutorTests.remoteStoreNodeAttributes; +import static org.opensearch.cluster.routing.allocation.RemoteStoreMigrationAllocationDeciderTests.getNonRemoteNode; +import static org.opensearch.cluster.routing.allocation.RemoteStoreMigrationAllocationDeciderTests.getRemoteNode; +import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.opensearch.test.ClusterServiceUtils.setState; +import static org.opensearch.test.VersionUtils.randomCompatibleVersion; +import static org.opensearch.test.VersionUtils.randomOpenSearchVersion; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; @@ -692,4 +710,149 @@ protected void masterOperation(Task task, Request request, ClusterState state, A assertFalse(retried.get()); assertFalse(exception.get()); } + + public void testDontAllowSwitchingToStrictCompatibilityModeForMixedCluster() { + Settings nodeSettings = Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build(); + FeatureFlags.initializeFeatureFlags(nodeSettings); + + // request to change cluster compatibility mode to STRICT + Settings currentCompatibilityModeSettings = Settings.builder() + .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), RemoteStoreNodeService.CompatibilityMode.MIXED) + .build(); + Settings intendedCompatibilityModeSettings = Settings.builder() + .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), RemoteStoreNodeService.CompatibilityMode.STRICT) + .build(); + ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest(); + request.persistentSettings(intendedCompatibilityModeSettings); + + // mixed cluster (containing both remote and non-remote nodes) + DiscoveryNode nonRemoteNode1 = getNonRemoteNode(); + DiscoveryNode remoteNode1 = getRemoteNode(); + + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder() + .add(nonRemoteNode1) + .localNodeId(nonRemoteNode1.getId()) + .add(remoteNode1) + .localNodeId(remoteNode1.getId()) + .build(); + + Metadata metadata = Metadata.builder().persistentSettings(currentCompatibilityModeSettings).build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).nodes(discoveryNodes).build(); + + final SettingsException exception = expectThrows( + SettingsException.class, + () -> TransportClusterUpdateSettingsAction.validateCompatibilityModeSettingRequest(request, clusterState) + ); + assertEquals( + "can not switch to STRICT compatibility mode when the cluster contains both remote and non-remote nodes", + exception.getMessage() + ); + + DiscoveryNode nonRemoteNode2 = getNonRemoteNode(); + DiscoveryNode remoteNode2 = getRemoteNode(); + + // cluster with only non-remote nodes + discoveryNodes = DiscoveryNodes.builder() + .add(nonRemoteNode1) + .localNodeId(nonRemoteNode1.getId()) + .add(nonRemoteNode2) + .localNodeId(nonRemoteNode2.getId()) + .build(); + ClusterState sameTypeClusterState = ClusterState.builder(clusterState).nodes(discoveryNodes).build(); + TransportClusterUpdateSettingsAction.validateCompatibilityModeSettingRequest(request, sameTypeClusterState); + + // cluster with only non-remote nodes + discoveryNodes = DiscoveryNodes.builder() + .add(remoteNode1) + .localNodeId(remoteNode1.getId()) + .add(remoteNode2) + .localNodeId(remoteNode2.getId()) + .build(); + sameTypeClusterState = ClusterState.builder(sameTypeClusterState).nodes(discoveryNodes).build(); + TransportClusterUpdateSettingsAction.validateCompatibilityModeSettingRequest(request, sameTypeClusterState); + } + + public void testDontAllowSwitchingCompatibilityModeForClusterWithMultipleVersions() { + Settings nodeSettings = Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build(); + FeatureFlags.initializeFeatureFlags(nodeSettings); + + // request to change cluster compatibility mode + boolean toStrictMode = randomBoolean(); + Settings currentCompatibilityModeSettings = Settings.builder() + .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), RemoteStoreNodeService.CompatibilityMode.MIXED) + .build(); + Settings intendedCompatibilityModeSettings = Settings.builder() + .put( + REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), + toStrictMode ? RemoteStoreNodeService.CompatibilityMode.STRICT : RemoteStoreNodeService.CompatibilityMode.MIXED + ) + .build(); + ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest(); + request.persistentSettings(intendedCompatibilityModeSettings); + + // two different but compatible open search versions for the discovery nodes + final Version version1 = randomOpenSearchVersion(random()); + final Version version2 = randomCompatibleVersion(random(), version1); + + assert version1.equals(version2) == false : "current nodes in the cluster must be of different versions"; + + DiscoveryNode discoveryNode1 = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + toStrictMode ? remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO) : Collections.emptyMap(), + DiscoveryNodeRole.BUILT_IN_ROLES, + version1 + ); + DiscoveryNode discoveryNode2 = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + toStrictMode ? remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO) : Collections.emptyMap(), + DiscoveryNodeRole.BUILT_IN_ROLES, + version2 // not same as discoveryNode1 + ); + + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder() + .add(discoveryNode1) + .localNodeId(discoveryNode1.getId()) + .add(discoveryNode2) + .localNodeId(discoveryNode2.getId()) + .build(); + + Metadata metadata = Metadata.builder().persistentSettings(currentCompatibilityModeSettings).build(); + + ClusterState differentVersionClusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata(metadata) + .nodes(discoveryNodes) + .build(); + + // changing compatibility mode when all nodes are not of the same version + final SettingsException exception = expectThrows( + SettingsException.class, + () -> TransportClusterUpdateSettingsAction.validateCompatibilityModeSettingRequest(request, differentVersionClusterState) + ); + assertThat( + exception.getMessage(), + containsString("can not change the compatibility mode when all the nodes in cluster are not of the same version") + ); + + // changing compatibility mode when all nodes are of the same version + discoveryNode2 = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + toStrictMode ? remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO) : Collections.emptyMap(), + DiscoveryNodeRole.BUILT_IN_ROLES, + version1 // same as discoveryNode1 + ); + discoveryNodes = DiscoveryNodes.builder() + .add(discoveryNode1) + .localNodeId(discoveryNode1.getId()) + .add(discoveryNode2) + .localNodeId(discoveryNode2.getId()) + .build(); + + ClusterState sameVersionClusterState = ClusterState.builder(differentVersionClusterState).nodes(discoveryNodes).build(); + TransportClusterUpdateSettingsAction.validateCompatibilityModeSettingRequest(request, sameVersionClusterState); + } + } diff --git a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java index 5eafe63e63fad..875251183b802 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java @@ -922,16 +922,16 @@ private DiscoveryNode newDiscoveryNode(Map attributes) { ); } - private static final String SEGMENT_REPO = "segment-repo"; - private static final String TRANSLOG_REPO = "translog-repo"; + public static final String SEGMENT_REPO = "segment-repo"; + public static final String TRANSLOG_REPO = "translog-repo"; private static final String CLUSTER_STATE_REPO = "cluster-state-repo"; private static final String COMMON_REPO = "remote-repo"; - private Map remoteStoreNodeAttributes(String segmentRepoName, String translogRepoName) { + public static Map remoteStoreNodeAttributes(String segmentRepoName, String translogRepoName) { return remoteStoreNodeAttributes(segmentRepoName, translogRepoName, CLUSTER_STATE_REPO); } - private Map remoteStoreNodeAttributes(String segmentRepoName, String translogRepoName, String clusterStateRepo) { + private static Map remoteStoreNodeAttributes(String segmentRepoName, String translogRepoName, String clusterStateRepo) { String segmentRepositoryTypeAttributeKey = String.format( Locale.getDefault(), REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, @@ -968,7 +968,7 @@ private Map remoteStoreNodeAttributes(String segmentRepoName, St }; } - private Map remoteStateNodeAttributes(String clusterStateRepo) { + private static Map remoteStateNodeAttributes(String clusterStateRepo) { String clusterStateRepositoryTypeAttributeKey = String.format( Locale.getDefault(), REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java index fa71b77648d35..51b09a20604c0 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java @@ -55,6 +55,7 @@ import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.UUIDs; import org.opensearch.common.compress.CompressedXContent; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.IndexScopedSettings; @@ -62,6 +63,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.BigArrays; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.index.Index; import org.opensearch.core.xcontent.NamedXContentRegistry; @@ -82,6 +84,7 @@ import org.opensearch.indices.SystemIndexDescriptor; import org.opensearch.indices.SystemIndices; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.node.remotestore.RemoteStoreNodeService; import org.opensearch.snapshots.EmptySnapshotsInfoService; import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; @@ -118,6 +121,8 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; +import static org.opensearch.cluster.coordination.JoinTaskExecutorTests.SEGMENT_REPO; +import static org.opensearch.cluster.coordination.JoinTaskExecutorTests.TRANSLOG_REPO; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_READ_ONLY_BLOCK; @@ -136,6 +141,9 @@ import static org.opensearch.cluster.metadata.MetadataCreateIndexService.getIndexNumberOfRoutingShards; import static org.opensearch.cluster.metadata.MetadataCreateIndexService.parseV1Mappings; import static org.opensearch.cluster.metadata.MetadataCreateIndexService.resolveAndValidateAliases; +import static org.opensearch.cluster.routing.allocation.RemoteStoreMigrationAllocationDeciderTests.getNonRemoteNode; +import static org.opensearch.cluster.routing.allocation.RemoteStoreMigrationAllocationDeciderTests.getRemoteNode; +import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL; import static org.opensearch.index.IndexModule.INDEX_STORE_TYPE_SETTING; import static org.opensearch.index.IndexSettings.INDEX_REFRESH_INTERVAL_SETTING; import static org.opensearch.index.IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING; @@ -150,6 +158,8 @@ import static org.opensearch.node.Node.NODE_ATTRIBUTES; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; @@ -1349,18 +1359,20 @@ public void testClusterForceReplicationTypeInValidateIndexSettings() { } public void testRemoteStoreNoUserOverrideExceptReplicationTypeSegmentIndexSettings() { + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(getRemoteNode()).build()) + .build(); Settings settings = Settings.builder() .put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.DOCUMENT) - .put(segmentRepositoryNameAttributeKey, "my-segment-repo-1") - .put(translogRepositoryNameAttributeKey, "my-translog-repo-1") + .put(segmentRepositoryNameAttributeKey, SEGMENT_REPO) + .put(translogRepositoryNameAttributeKey, TRANSLOG_REPO) .build(); - request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test"); final Settings.Builder requestSettings = Settings.builder(); requestSettings.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT); request.settings(requestSettings.build()); Settings indexSettings = aggregateIndexSettings( - ClusterState.EMPTY_STATE, + clusterState, request, Settings.EMPTY, null, @@ -1373,24 +1385,27 @@ public void testRemoteStoreNoUserOverrideExceptReplicationTypeSegmentIndexSettin verifyRemoteStoreIndexSettings( indexSettings, "true", - "my-segment-repo-1", - "my-translog-repo-1", + SEGMENT_REPO, + TRANSLOG_REPO, ReplicationType.SEGMENT.toString(), IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL ); } public void testRemoteStoreImplicitOverrideReplicationTypeToSegmentForRemoteStore() { + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(getRemoteNode()).build()) + .build(); Settings settings = Settings.builder() - .put(segmentRepositoryNameAttributeKey, "my-segment-repo-1") - .put(translogRepositoryNameAttributeKey, "my-translog-repo-1") + .put(segmentRepositoryNameAttributeKey, SEGMENT_REPO) + .put(translogRepositoryNameAttributeKey, TRANSLOG_REPO) .build(); request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test"); final Settings.Builder requestSettings = Settings.builder(); request.settings(requestSettings.build()); Settings indexSettings = aggregateIndexSettings( - ClusterState.EMPTY_STATE, + clusterState, request, Settings.EMPTY, null, @@ -1403,23 +1418,26 @@ public void testRemoteStoreImplicitOverrideReplicationTypeToSegmentForRemoteStor verifyRemoteStoreIndexSettings( indexSettings, "true", - "my-segment-repo-1", - "my-translog-repo-1", + SEGMENT_REPO, + TRANSLOG_REPO, ReplicationType.SEGMENT.toString(), IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL ); } public void testRemoteStoreNoUserOverrideIndexSettings() { + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(getRemoteNode()).build()) + .build(); Settings settings = Settings.builder() .put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT) - .put(segmentRepositoryNameAttributeKey, "my-segment-repo-1") - .put(translogRepositoryNameAttributeKey, "my-translog-repo-1") + .put(segmentRepositoryNameAttributeKey, SEGMENT_REPO) + .put(translogRepositoryNameAttributeKey, TRANSLOG_REPO) .build(); request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test"); Settings indexSettings = aggregateIndexSettings( - ClusterState.EMPTY_STATE, + clusterState, request, Settings.EMPTY, null, @@ -1432,8 +1450,8 @@ public void testRemoteStoreNoUserOverrideIndexSettings() { verifyRemoteStoreIndexSettings( indexSettings, "true", - "my-segment-repo-1", - "my-translog-repo-1", + SEGMENT_REPO, + TRANSLOG_REPO, ReplicationType.SEGMENT.toString(), IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL ); @@ -1551,6 +1569,117 @@ public void testRemoteStoreOverrideTranslogRepoIndexSettings() { })); } + public void testNewIndexIsRemoteStoreBackedForRemoteStoreDirectionAndMixedMode() { + FeatureFlags.initializeFeatureFlags(Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build()); + + // non-remote cluster manager node + DiscoveryNode nonRemoteClusterManagerNode = getNonRemoteNode(); + + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder() + .add(nonRemoteClusterManagerNode) + .localNodeId(nonRemoteClusterManagerNode.getId()) + .build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build(); + + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + + request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test"); + + Settings indexSettings = aggregateIndexSettings( + clusterState, + request, + Settings.EMPTY, + null, + Settings.EMPTY, + IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, + randomShardLimitService(), + Collections.emptySet(), + clusterSettings + ); + + verifyRemoteStoreIndexSettings( + indexSettings, + null, + null, + null, + ReplicationType.DOCUMENT.toString(), + IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL + ); + + // remote data node + DiscoveryNode remoteDataNode = getRemoteNode(); + + discoveryNodes = DiscoveryNodes.builder(discoveryNodes).add(remoteDataNode).localNodeId(remoteDataNode.getId()).build(); + + clusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build(); + + Settings remoteStoreMigrationSettings = Settings.builder() + .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), RemoteStoreNodeService.CompatibilityMode.MIXED) + .put(MIGRATION_DIRECTION_SETTING.getKey(), RemoteStoreNodeService.Direction.REMOTE_STORE) + .build(); + + clusterSettings = new ClusterSettings(remoteStoreMigrationSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + + indexSettings = aggregateIndexSettings( + clusterState, + request, + Settings.EMPTY, + null, + Settings.EMPTY, + IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, + randomShardLimitService(), + Collections.emptySet(), + clusterSettings + ); + + verifyRemoteStoreIndexSettings( + indexSettings, + "true", + SEGMENT_REPO, + TRANSLOG_REPO, + ReplicationType.SEGMENT.toString(), + IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL + ); + + Map missingTranslogAttribute = Map.of(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, "my-segment-repo-1"); + + DiscoveryNodes finalDiscoveryNodes = DiscoveryNodes.builder() + .add(nonRemoteClusterManagerNode) + .add( + new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + missingTranslogAttribute, + Set.of(DiscoveryNodeRole.INGEST_ROLE, DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE, DiscoveryNodeRole.DATA_ROLE), + Version.CURRENT + ) + ) + .build(); + + ClusterState finalClusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(finalDiscoveryNodes).build(); + ClusterSettings finalClusterSettings = clusterSettings; + + final IndexCreationException error = expectThrows(IndexCreationException.class, () -> { + aggregateIndexSettings( + finalClusterState, + request, + Settings.EMPTY, + null, + Settings.EMPTY, + IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, + randomShardLimitService(), + Collections.emptySet(), + finalClusterSettings + ); + }); + + assertThat( + error.getCause().getMessage(), + containsString("Cluster is migrating to remote store but no remote node found, failing index creation") + ); + } + public void testBuildIndexMetadata() { IndexMetadata sourceIndexMetadata = IndexMetadata.builder("parent") .settings(Settings.builder().put("index.version.created", Version.CURRENT).build()) diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteStoreMigrationAllocationDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteStoreMigrationAllocationDeciderTests.java index 43363407d9249..808a548cceeb0 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteStoreMigrationAllocationDeciderTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteStoreMigrationAllocationDeciderTests.java @@ -60,16 +60,16 @@ import org.opensearch.node.remotestore.RemoteStoreNodeService; import java.util.Collections; -import java.util.HashMap; import java.util.Locale; -import java.util.Map; +import static org.opensearch.cluster.coordination.JoinTaskExecutorTests.SEGMENT_REPO; +import static org.opensearch.cluster.coordination.JoinTaskExecutorTests.TRANSLOG_REPO; +import static org.opensearch.cluster.coordination.JoinTaskExecutorTests.remoteStoreNodeAttributes; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; import static org.hamcrest.core.Is.is; @@ -659,21 +659,16 @@ private ClusterState getInitialClusterState( } // get a dummy non-remote node - private DiscoveryNode getNonRemoteNode() { + public static DiscoveryNode getNonRemoteNode() { return new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT); } // get a dummy remote node - public DiscoveryNode getRemoteNode() { - Map attributes = new HashMap<>(); - attributes.put( - REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, - "REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_VALUE" - ); + public static DiscoveryNode getRemoteNode() { return new DiscoveryNode( UUIDs.base64UUID(), buildNewFakeTransportAddress(), - attributes, + remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO), DiscoveryNodeRole.BUILT_IN_ROLES, Version.CURRENT );