Skip to content

Commit

Permalink
Split the remote global metadata file to metadata attribute files (#1…
Browse files Browse the repository at this point in the history
…2190)

* Split the cluster state remote global metadata file to metadata attribute files

Signed-off-by: Shivansh Arora <hishiv@amazon.com>
(cherry picked from commit da3ab92)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed May 16, 2024
1 parent d4c6b92 commit 90e767a
Show file tree
Hide file tree
Showing 11 changed files with 1,883 additions and 420 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.upgrades;

import org.opensearch.client.Request;
import org.opensearch.client.Response;

import java.util.Map;

public class ClusterStateIT extends AbstractRollingTestCase{
public void testTemplateMetadataUpgrades() throws Exception {
if (CLUSTER_TYPE == ClusterType.OLD) {
String templateName = "my_template";
Request putIndexTemplate = new Request("PUT", "_template/" + templateName);
putIndexTemplate.setJsonEntity("{\"index_patterns\": [\"pattern-1\", \"log-*\"]}");
client().performRequest(putIndexTemplate);
verifyTemplateMetadataInClusterState();
} else {
verifyTemplateMetadataInClusterState();
}
}

@SuppressWarnings("unchecked")
private static void verifyTemplateMetadataInClusterState() throws Exception {
Request request = new Request("GET", "_cluster/state/metadata");
Response response = client().performRequest(request);
assertOK(response);
Map<String, Object> metadata = (Map<String, Object>) entityAsMap(response).get("metadata");
assertNotNull(metadata.get("templates"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,21 @@
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.gateway.remote.RemoteClusterStateService.COORDINATION_METADATA;
import static org.opensearch.gateway.remote.RemoteClusterStateService.CUSTOM_METADATA;
import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER;
import static org.opensearch.gateway.remote.RemoteClusterStateService.METADATA_FILE_PREFIX;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.SETTING_METADATA;
import static org.opensearch.gateway.remote.RemoteClusterStateService.TEMPLATES_METADATA;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteClusterStateServiceIT extends RemoteStoreBaseIntegTestCase {
Expand Down Expand Up @@ -181,6 +189,45 @@ public void testRemoteStateStatsFromAllNodes() {
}
}

public void testRemoteClusterStateMetadataSplit() throws IOException {
initialTestSetup(1, 0, 1, 1);

RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance(
RemoteClusterStateService.class
);
RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);
BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME);
BlobPath globalMetadataPath = repository.basePath()
.add(
Base64.getUrlEncoder()
.withoutPadding()
.encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8))
)
.add("cluster-state")
.add(getClusterState().metadata().clusterUUID())
.add("global-metadata");

Map<String, Integer> metadataFiles = repository.blobStore()
.blobContainer(globalMetadataPath)
.listBlobs()
.keySet()
.stream()
.map(fileName -> {
logger.info(fileName);
return fileName.split(DELIMITER)[0];
})
.collect(Collectors.toMap(Function.identity(), key -> 1, Integer::sum));

assertTrue(metadataFiles.containsKey(COORDINATION_METADATA));
assertEquals(1, (int) metadataFiles.get(COORDINATION_METADATA));
assertTrue(metadataFiles.containsKey(SETTING_METADATA));
assertEquals(1, (int) metadataFiles.get(SETTING_METADATA));
assertTrue(metadataFiles.containsKey(TEMPLATES_METADATA));
assertEquals(1, (int) metadataFiles.get(TEMPLATES_METADATA));
assertTrue(metadataFiles.keySet().stream().anyMatch(key -> key.startsWith(CUSTOM_METADATA)));
assertFalse(metadataFiles.containsKey(METADATA_FILE_PREFIX));
}

private void validateNodesStatsResponse(NodesStatsResponse nodesStatsResponse) {
// _nodes/stats/discovery must never fail due to any exception
assertFalse(nodesStatsResponse.toString().contains("exception"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,30 @@

package org.opensearch.remotestore;

import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction;
import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.opensearch.action.admin.indices.alias.Alias;
import org.opensearch.action.admin.indices.datastream.DataStreamRolloverIT;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.action.admin.indices.template.put.PutComponentTemplateAction;
import org.opensearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
import org.opensearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.metadata.ComponentTemplate;
import org.opensearch.cluster.metadata.ComponentTemplateMetadata;
import org.opensearch.cluster.metadata.ComposableIndexTemplate;
import org.opensearch.cluster.metadata.ComposableIndexTemplateMetadata;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexTemplateMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.cluster.metadata.Template;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
Expand All @@ -29,11 +43,13 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import static org.opensearch.cluster.coordination.ClusterBootstrapService.INITIAL_CLUSTER_MANAGER_NODES_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_READ_ONLY_SETTING;
Expand All @@ -46,6 +62,11 @@

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreClusterStateRestoreIT extends BaseRemoteStoreRestoreIT {
static final String TEMPLATE_NAME = "remote-store-test-template";
static final String COMPONENT_TEMPLATE_NAME = "remote-component-template1";
static final String COMPOSABLE_TEMPLATE_NAME = "remote-composable-template1";
static final Setting<String> MOCK_SETTING = Setting.simpleString("mock-setting");
static final String[] EXCLUDED_NODES = { "ex-1", "ex-2" };

@Override
protected Settings nodeSettings(int nodeOrdinal) {
Expand Down Expand Up @@ -87,6 +108,45 @@ public void testFullClusterRestore() throws Exception {
Map<String, Long> indexStats = initialTestSetup(shardCount, replicaCount, dataNodeCount, 1);
String prevClusterUUID = clusterService().state().metadata().clusterUUID();
long prevClusterStateVersion = clusterService().state().version();
// Step - 1.1 Add some cluster state elements
ActionFuture<AcknowledgedResponse> response = client().admin()
.indices()
.preparePutTemplate(TEMPLATE_NAME)
.addAlias(new Alias(INDEX_NAME))
.setPatterns(Arrays.stream(INDEX_NAMES_WILDCARD.split(",")).collect(Collectors.toList()))
.execute();
assertTrue(response.get().isAcknowledged());
ActionFuture<ClusterUpdateSettingsResponse> clusterUpdateSettingsResponse = client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(SETTING_READ_ONLY_SETTING.getKey(), false).build())
.execute();
assertTrue(clusterUpdateSettingsResponse.get().isAcknowledged());
// update coordination metadata
client().execute(AddVotingConfigExclusionsAction.INSTANCE, new AddVotingConfigExclusionsRequest(EXCLUDED_NODES));
// Add a custom metadata as component index template
ActionFuture<AcknowledgedResponse> componentTemplateResponse = client().execute(
PutComponentTemplateAction.INSTANCE,
new PutComponentTemplateAction.Request(COMPONENT_TEMPLATE_NAME).componentTemplate(
new ComponentTemplate(new Template(Settings.EMPTY, null, Collections.emptyMap()), 1L, Collections.emptyMap())
)
);
assertTrue(componentTemplateResponse.get().isAcknowledged());
ActionFuture<AcknowledgedResponse> composableTemplateResponse = client().execute(
PutComposableIndexTemplateAction.INSTANCE,
new PutComposableIndexTemplateAction.Request(COMPOSABLE_TEMPLATE_NAME).indexTemplate(
new ComposableIndexTemplate(
Arrays.stream(INDEX_NAMES_WILDCARD.split(",")).collect(Collectors.toList()),
new Template(Settings.EMPTY, null, Collections.emptyMap()),
Collections.singletonList(COMPONENT_TEMPLATE_NAME),
1L,
1L,
Collections.emptyMap(),
null
)
)
);
assertTrue(composableTemplateResponse.get().isAcknowledged());

// Step - 2 Replace all nodes in the cluster with new nodes. This ensures new cluster state doesn't have previous index metadata
resetCluster(dataNodeCount, clusterManagerNodeCount);
Expand All @@ -104,7 +164,24 @@ public void testFullClusterRestore() throws Exception {
);
validateMetadata(List.of(INDEX_NAME));
verifyRedIndicesAndTriggerRestore(indexStats, INDEX_NAME, true);

clusterService().state()
.metadata()
.coordinationMetadata()
.getVotingConfigExclusions()
.stream()
.forEach(config -> assertTrue(Arrays.stream(EXCLUDED_NODES).anyMatch(node -> node.equals(config.getNodeId()))));
assertFalse(clusterService().state().metadata().templates().isEmpty());
assertTrue(clusterService().state().metadata().templates().containsKey(TEMPLATE_NAME));
assertFalse(clusterService().state().metadata().settings().isEmpty());
assertFalse(clusterService().state().metadata().settings().getAsBoolean(SETTING_READ_ONLY_SETTING.getKey(), true));
assertNotNull(clusterService().state().metadata().custom("component_template"));
ComponentTemplateMetadata componentTemplateMetadata = clusterService().state().metadata().custom("component_template");
assertFalse(componentTemplateMetadata.componentTemplates().isEmpty());
assertTrue(componentTemplateMetadata.componentTemplates().containsKey(COMPONENT_TEMPLATE_NAME));
assertNotNull(clusterService().state().metadata().custom("index_template"));
ComposableIndexTemplateMetadata composableIndexTemplate = clusterService().state().metadata().custom("index_template");
assertFalse(composableIndexTemplate.indexTemplates().isEmpty());
assertTrue(composableIndexTemplate.indexTemplates().containsKey(COMPOSABLE_TEMPLATE_NAME));
}

/**
Expand Down
Loading

0 comments on commit 90e767a

Please sign in to comment.