Skip to content

Commit

Permalink
[Remote Store] Changes to perform repository registration during boot…
Browse files Browse the repository at this point in the history
…strap via node attributes.

Signed-off-by: Dharmesh 💤 <sdharms@amazon.com>
  • Loading branch information
psychbot authored and Sachin Kale committed Aug 31, 2023
1 parent f9a661c commit d6c7866
Show file tree
Hide file tree
Showing 12 changed files with 478 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [BWC and API enforcement] Decorate the existing APIs with proper annotations (part 1) ([#9520](https://github.com/opensearch-project/OpenSearch/pull/9520))
- Add concurrent segment search related metrics to node and index stats ([#9622](https://github.com/opensearch-project/OpenSearch/issues/9622))
- Decouple replication lag from logic to fail stale replicas ([#9507](https://github.com/opensearch-project/OpenSearch/pull/9507))
- [Remote Store] Changes to introduce repository registration during bootstrap via node attributes. ([#9105](https://github.com/opensearch-project/OpenSearch/pull/9105))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.remotestore;

import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.Settings;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* This is an extension of {@Code DiscoveryNode} which provides an abstraction for validating and storing information
* specific to remote backed storage nodes.
*
* @opensearch.internal
*/
public class RemoteStoreNode extends DiscoveryNode {

public static final String REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX = "remote_store";
public static final String REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY = "remote_store.segment.repository";
public static final String REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY = "remote_store.translog.repository";
public static final String REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT = "remote_store.repository.%s.type";
public static final String REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX = "remote_store.repository.%s.settings.";
private final DiscoveryNode node;
private final RepositoriesMetadata repositoriesMetadata;

/**
* Creates a new {@link RemoteStoreNode}
*/
public RemoteStoreNode(DiscoveryNode node) {
super(node.getName(), node.getId(), node.getAddress(), node.getAttributes(), node.getRoles(), node.getVersion());
this.node = node;
this.repositoriesMetadata = buildRepositoriesMetadata(node);
}

private String validateAttributeNonNull(DiscoveryNode node, String attributeKey) {
String attributeValue = node.getAttributes().get(attributeKey);
if (attributeValue == null || attributeValue.isEmpty()) {
throw new IllegalStateException("joining node [" + node + "] doesn't have the node attribute [" + attributeKey + "].");
}

return attributeValue;
}

private Map<String, String> validateSettingsAttributesNonNull(DiscoveryNode node, String settingsAttributeKeyPrefix) {
return node.getAttributes()
.keySet()
.stream()
.filter(key -> key.startsWith(settingsAttributeKeyPrefix))
.collect(Collectors.toMap(key -> key.replace(settingsAttributeKeyPrefix, ""), key -> validateAttributeNonNull(node, key)));
}

// TODO: Add logic to mark these repository as System Repository once thats merged.
private RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String name) {
String type = validateAttributeNonNull(
node,
String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, name)
);
Map<String, String> settingsMap = validateSettingsAttributesNonNull(
node,
String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, name)
);

Settings.Builder settings = Settings.builder();
settingsMap.forEach(settings::put);

return new RepositoryMetadata(name, type, settings.build());
}

private RepositoriesMetadata buildRepositoriesMetadata(DiscoveryNode node) {
String segmentRepositoryName = node.getAttributes().get(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY);
String translogRepositoryName = node.getAttributes().get(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY);
if (segmentRepositoryName.equals(translogRepositoryName)) {
return new RepositoriesMetadata(Collections.singletonList(buildRepositoryMetadata(node, segmentRepositoryName)));
} else {
List<RepositoryMetadata> repositoryMetadataList = new ArrayList<>();
repositoryMetadataList.add(buildRepositoryMetadata(node, segmentRepositoryName));
repositoryMetadataList.add(buildRepositoryMetadata(node, translogRepositoryName));
return new RepositoriesMetadata(repositoryMetadataList);
}
}

RepositoriesMetadata getRepositoriesMetadata() {
return this.repositoriesMetadata;
}

@Override
public int hashCode() {
// We will hash the id and repositories metadata as its highly unlikely that two nodes will have same id and
// repositories metadata but are actually different.
return Objects.hash(node.getEphemeralId(), repositoriesMetadata);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

RemoteStoreNode that = (RemoteStoreNode) o;

return this.getRepositoriesMetadata().equalsIgnoreGenerations(that.getRepositoriesMetadata());
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append('{').append(this.node).append('}');
sb.append('{').append(this.repositoriesMetadata).append('}');
return super.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.remotestore;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.Setting;
import org.opensearch.core.action.ActionListener;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.function.Supplier;

/**
* Contains all the method needed for a remote store node lifecycle.
*/
public class RemoteStoreService {

private static final Logger logger = LogManager.getLogger(RemoteStoreService.class);
private final Supplier<RepositoriesService> repositoriesService;
public static final Setting<String> REMOTE_STORE_MIGRATION_SETTING = Setting.simpleString(
"remote_store.migration",
MigrationTypes.NOT_MIGRATING.value,
MigrationTypes::validate,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public enum MigrationTypes {
NOT_MIGRATING("not_migrating"),
MIGRATING_TO_REMOTE_STORE("migrating_to_remote_store"),
MIGRATING_TO_HOT("migrating_to_hot");

public static MigrationTypes validate(String migrationType) {
try {
return MigrationTypes.valueOf(migrationType.toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
"["
+ migrationType
+ "] migration type is not supported. "
+ "Supported migration types are ["
+ MigrationTypes.values().toString()
+ "]"
);
}
}

public final String value;

MigrationTypes(String value) {
this.value = value;
}
}

public RemoteStoreService(Supplier<RepositoriesService> repositoriesService) {
this.repositoriesService = repositoriesService;
}

public void verifyRepository(RepositoryMetadata repositoryMetadata) {
ActionListener<VerifyRepositoryResponse> listener = new ActionListener<>() {

@Override
public void onResponse(VerifyRepositoryResponse verifyRepositoryResponse) {
logger.info("Successfully verified repository : " + verifyRepositoryResponse.toString());
}

@Override
public void onFailure(Exception e) {
throw new IllegalStateException("Failed to finish remote store repository verification" + e.getMessage());
}
};

repositoriesService.get()
.verifyRepository(
repositoryMetadata.name(),
ActionListener.delegateFailure(
listener,
(delegatedListener, verifyResponse) -> delegatedListener.onResponse(
new VerifyRepositoryResponse(verifyResponse.toArray(new DiscoveryNode[0]))
)
)
);
}

private ClusterState createRepository(RepositoryMetadata newRepositoryMetadata, ClusterState currentState) {
RepositoriesService.validate(newRepositoryMetadata.name());

Metadata metadata = currentState.metadata();
Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());
RepositoriesMetadata repositories = metadata.custom(RepositoriesMetadata.TYPE);
if (repositories == null) {
Repository repository = repositoriesService.get().createRepository(newRepositoryMetadata);
logger.info(
"Remote store repository with name {} and type {} created.",
repository.getMetadata().name(),
repository.getMetadata().type()
);
repositories = new RepositoriesMetadata(Collections.singletonList(newRepositoryMetadata));
} else {
List<RepositoryMetadata> repositoriesMetadata = new ArrayList<>(repositories.repositories().size() + 1);

for (RepositoryMetadata repositoryMetadata : repositories.repositories()) {
if (repositoryMetadata.name().equals(newRepositoryMetadata.name())) {
if (newRepositoryMetadata.equalsIgnoreGenerations(repositoryMetadata)) {
return new ClusterState.Builder(currentState).build();
} else {
throw new IllegalStateException(
"new repository metadata ["
+ newRepositoryMetadata
+ "] supplied by joining node is different from existing repository metadata ["
+ repositoryMetadata
+ "]."
);
}
} else {
repositoriesMetadata.add(repositoryMetadata);
}
}
Repository repository = repositoriesService.get().createRepository(newRepositoryMetadata);
logger.info(
"Remote store repository with name {} and type {} created",
repository.getMetadata().name(),
repository.getMetadata().type()
);
repositoriesMetadata.add(newRepositoryMetadata);
repositories = new RepositoriesMetadata(repositoriesMetadata);
}
mdBuilder.putCustom(RepositoriesMetadata.TYPE, repositories);
return ClusterState.builder(currentState).metadata(mdBuilder).build();
}

private boolean isRepositoryCreated(RepositoryMetadata repositoryMetadata) {
try {
repositoriesService.get().repository(repositoryMetadata.name());
return true;
} catch (RepositoryMissingException e) {
return false;
}
}

private boolean isRepositoryAddedInClusterState(RepositoryMetadata repositoryMetadata, ClusterState currentState) {
RepositoriesMetadata repositoriesMetadata = currentState.metadata().custom(RepositoriesMetadata.TYPE);
if (repositoriesMetadata == null) {
return false;
}
for (RepositoryMetadata existingRepositoryMetadata : repositoriesMetadata.repositories()) {
existingRepositoryMetadata.equalsIgnoreGenerations(repositoryMetadata);
return true;
}
return false;
}

private ClusterState createOrVerifyRepository(RepositoriesMetadata repositoriesMetadata, ClusterState currentState) {
ClusterState newState = ClusterState.builder(currentState).build();
for (RepositoryMetadata repositoryMetadata : repositoriesMetadata.repositories()) {
if (isRepositoryCreated(repositoryMetadata)) {
verifyRepository(repositoryMetadata);
} else {
if (!isRepositoryAddedInClusterState(repositoryMetadata, currentState)) {
newState = ClusterState.builder(createRepository(repositoryMetadata, newState)).build();
// verifyRepository(repositoryMetadata);
}
}
}
return newState;
}

public ClusterState joinCluster(RemoteStoreNode joiningRemoteStoreNode, ClusterState currentState) {
List<DiscoveryNode> existingNodes = new ArrayList<>(currentState.nodes().getNodes().values());
if (existingNodes.isEmpty()) {
return currentState;
}
ClusterState.Builder newState = ClusterState.builder(currentState);
if (existingNodes.get(0).isRemoteStoreNode()) {
RemoteStoreNode existingRemoteStoreNode = new RemoteStoreNode(existingNodes.get(0));
if (existingRemoteStoreNode.equals(joiningRemoteStoreNode)) {
newState = ClusterState.builder(createOrVerifyRepository(joiningRemoteStoreNode.getRepositoriesMetadata(), currentState));
}
} else {
throw new IllegalStateException(
"a remote store node [" + joiningRemoteStoreNode + "] is trying to join a non remote store cluster."
);
}
return newState.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.admin.cluster.remotestore.RemoteStoreService;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
Expand Down Expand Up @@ -181,6 +182,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private JoinHelper.JoinAccumulator joinAccumulator;
private Optional<CoordinatorPublication> currentPublication = Optional.empty();
private final NodeHealthService nodeHealthService;
private final RemoteStoreService remoteStoreService;

/**
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
Expand All @@ -201,7 +203,8 @@ public Coordinator(
Random random,
RerouteService rerouteService,
ElectionStrategy electionStrategy,
NodeHealthService nodeHealthService
NodeHealthService nodeHealthService,
RemoteStoreService remoteStoreService
) {
this.settings = settings;
this.transportService = transportService;
Expand All @@ -223,7 +226,8 @@ public Coordinator(
rerouteService,
nodeHealthService,
this::onNodeCommissionStatusChange,
namedWriteableRegistry
namedWriteableRegistry,
remoteStoreService
);
this.persistedStateSupplier = persistedStateSupplier;
this.noClusterManagerBlockService = new NoClusterManagerBlockService(settings, clusterSettings);
Expand Down Expand Up @@ -287,6 +291,7 @@ public Coordinator(
);
this.nodeHealthService = nodeHealthService;
this.localNodeCommissioned = true;
this.remoteStoreService = remoteStoreService;
}

private ClusterFormationState getClusterFormationState() {
Expand Down Expand Up @@ -605,6 +610,8 @@ private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback
// we are checking source node commission status here to reject any join request coming from a decommissioned node
// even before executing the join task to fail fast
JoinTaskExecutor.ensureNodeCommissioned(joinRequest.getSourceNode(), stateForJoinValidation.metadata());

JoinTaskExecutor.ensureRemoteStoreNodesCompatibility(joinRequest.getSourceNode(), stateForJoinValidation);
}
sendValidateJoinRequest(stateForJoinValidation, joinRequest, joinCallback);
} else {
Expand Down
Loading

0 comments on commit d6c7866

Please sign in to comment.