Skip to content

Commit

Permalink
prototype for in-place update
Browse files Browse the repository at this point in the history
  • Loading branch information
rajiv-kv committed Sep 16, 2024
1 parent 2e98df3 commit d615f68
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
.build();
} else {
logger.info("Adding docrep node");
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build();
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).build(); //.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotemigration;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexService;
import org.opensearch.plugins.Plugin;
import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;

import java.util.Collection;
import java.util.Locale;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteACMIT extends MigrationBaseTestCase {
private final String REMOTE_PRI_DOCREP_REP = "remote-primary-docrep-replica";

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
/* Adding the following mock plugins:
- InternalSettingsPlugin : To override default intervals of retention lease and global ckp sync
- MockFsRepositoryPlugin and MockTransportService.TestPlugin: To ensure remote interactions are not no-op and retention leases are properly propagated
*/
return Stream.concat(
super.nodePlugins().stream(),
Stream.of(InternalSettingsPlugin.class, MockFsRepositoryPlugin.class, MockTransportService.TestPlugin.class)
).collect(Collectors.toList());
}

public Settings.Builder remotePublishNodeSetting() {
if (segmentRepoPath == null || translogRepoPath == null) {
segmentRepoPath = randomRepoPath().toAbsolutePath();
translogRepoPath = randomRepoPath().toAbsolutePath();
}
String segmentRepoName = "test-remote-store-repo";
String stateRepoSettingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
segmentRepoName
);
String prefixModeVerificationSuffix = BlobStoreRepository.PREFIX_MODE_VERIFICATION_SETTING.getKey();
String stateRepoTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
segmentRepoName
);
String routingTableRepoName = "remote-routing-repo";
String routingTableRepoTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
routingTableRepoName
);
String routingTableRepoSettingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
routingTableRepoName
);

Settings.Builder builder = Settings.builder()
.put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, segmentRepoName)
.put(stateRepoTypeAttributeKey, ReloadableFsRepository.TYPE)
.put(stateRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath)
.put(stateRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, true)
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, routingTableRepoName)
.put(routingTableRepoTypeAttributeKey, ReloadableFsRepository.TYPE)
.put(routingTableRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath);
return builder;
}

public void testACMJoiningNonACMCluster() throws Exception {
// non-acm cluster-manager
String cmName = internalCluster().startClusterManagerOnlyNode();
Settings oneReplica = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "1s")
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "1s")
.build();

Settings.Builder build = remotePublishNodeSetting();

// have the node settings for repo
internalCluster().startDataOnlyNodes(2, build.build());
build.put(REMOTE_PUBLICATION_SETTING_KEY, true);

// acm cluster-manager
String cm2 = internalCluster().startClusterManagerOnlyNode(build.build());

// stop-non-acm cluster-manager
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(cmName));

// update cluster-state via remote
createIndex(REMOTE_PRI_DOCREP_REP, oneReplica);
ensureGreen();
}

// failure modes during ORR
// tms issue
// flee
public void testNonACMJoiningACMCluster() throws Exception {

// create - repository from external call

Settings.Builder build = remotePublishNodeSetting();
build.put(REMOTE_PUBLICATION_SETTING_KEY, true);

// acm cluster-manager
String cmName = internalCluster().startClusterManagerOnlyNode(build.build());
Settings oneReplica = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "1s")
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "1s")
.build();

// have the node settings for repo
internalCluster().startDataOnlyNodes(2, build.build());

// non-acm cluster-manager
String cm2 = internalCluster().startClusterManagerOnlyNode();

// stop acm cluster-manager
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(cmName));

// update cluster-state via remote
createIndex(REMOTE_PRI_DOCREP_REP, oneReplica);
ensureGreen();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.core.transport.TransportResponse.Empty;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.monitor.NodeHealthService;
import org.opensearch.monitor.StatusInfo;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.threadpool.ThreadPool.Names;
Expand Down Expand Up @@ -131,6 +133,8 @@ public class JoinHelper {
private final NamedWriteableRegistry namedWriteableRegistry;
private final AtomicReference<Tuple<Long, BytesReference>> serializedState = new AtomicReference<>();

private final boolean remoteStateEnabled;

JoinHelper(
Settings settings,
AllocationService allocationService,
Expand All @@ -153,7 +157,8 @@ public class JoinHelper {
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
this.nodeCommissioned = nodeCommissioned;
this.namedWriteableRegistry = namedWriteableRegistry;

this.remoteStoreNodeService = remoteStoreNodeService;
this.remoteStateEnabled = RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings);
this.joinTaskExecutorGenerator = () -> new JoinTaskExecutor(
settings,
allocationService,
Expand Down Expand Up @@ -227,6 +232,9 @@ public ClusterTasksResult<JoinTaskExecutor.Task> execute(ClusterState currentSta
ValidateJoinRequest::new,
(request, channel, task) -> {
runJoinValidators(currentStateSupplier, request.getState(), joinValidators);
if (request.isRemoteStateEnabled) {
remoteStoreNodeService.createAndVerifyRepositories(currentStateSupplier.get().nodes().getLocalNode());
}
channel.sendResponse(Empty.INSTANCE);
}
);
Expand All @@ -243,6 +251,8 @@ public ClusterTasksResult<JoinTaskExecutor.Task> execute(ClusterState currentSta

}

private final RemoteStoreNodeService remoteStoreNodeService;

private void runJoinValidators(
Supplier<ClusterState> currentStateSupplier,
ClusterState incomingState,
Expand Down Expand Up @@ -455,11 +465,11 @@ public String executor() {
}

public void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, ActionListener<TransportResponse.Empty> listener) {
if (node.getVersion().before(Version.V_2_9_0)) {
if (node.getVersion().after(Version.V_2_9_0)) {
transportService.sendRequest(
node,
VALIDATE_JOIN_ACTION_NAME,
new ValidateJoinRequest(state),
new ValidateJoinRequest(state, remoteStateEnabled),
new ActionListenerResponseHandler<>(listener, i -> Empty.INSTANCE, ThreadPool.Names.GENERIC)
);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
final DiscoveryNodes currentNodes = currentState.nodes();
boolean nodesChanged = false;
ClusterState.Builder newState;
DiscoveryNode remotePublishNode = null;

if (joiningNodes.size() == 1 && joiningNodes.get(0).isFinishElectionTask()) {
return results.successes(joiningNodes).build(currentState);
Expand All @@ -168,6 +169,9 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
// Note that we don't have to do any validation of the amount of joining nodes - the commit
// during the cluster state publishing guarantees that we have enough
newState = becomeClusterManagerAndTrimConflictingNodes(currentState, joiningNodes);
if (currentNodes.getLocalNode().isRemoteStatePublicationEnabled()) {
remotePublishNode = currentNodes.getLocalNode();
}
nodesChanged = true;
} else if (currentNodes.isLocalNodeElectedClusterManager() == false) {
logger.trace(
Expand All @@ -185,12 +189,20 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
// for every set of node join task which we can optimize to not compute if cluster state already has
// repository information.
Optional<DiscoveryNode> remoteDN = currentNodes.getNodes().values().stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst();
DiscoveryNode dn = remoteDN.orElseGet(() -> (currentNodes.getNodes().values()).stream().findFirst().get());
RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
dn,
currentState.getMetadata().custom(RepositoriesMetadata.TYPE)
);
RepositoriesMetadata repositoriesMetadata = null;

if (remotePublishNode != null) {
repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
remotePublishNode,
currentState.getMetadata().custom(RepositoriesMetadata.TYPE)
);
} else {
DiscoveryNode dn = remoteDN.orElseGet(() -> (currentNodes.getNodes().values()).stream().findFirst().get());
repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
dn,
currentState.getMetadata().custom(RepositoriesMetadata.TYPE)
);
}
assert nodesBuilder.isLocalNodeElectedClusterManager();

Version minClusterNodeVersion = newState.nodes().getMinNodeVersion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.IncompatibleClusterStateVersionException;
import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType;
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.core.action.ActionListener;
Expand Down Expand Up @@ -69,12 +70,16 @@
import java.util.function.Consumer;
import java.util.function.Function;

import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;

/**
* Transport handler for publication
*
* @opensearch.internal
*/
public class PublicationTransportHandler {
public class PublicationTransportHandler

{

private static final Logger logger = LogManager.getLogger(PublicationTransportHandler.class);

Expand Down Expand Up @@ -240,6 +245,7 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque

// package private for testing
PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException, IllegalStateException {
remoteClusterStateService.start();
boolean applyFullState = false;
try {
if (transportService.getLocalNode().equals(request.getSourceNode())) {
Expand Down Expand Up @@ -354,12 +360,24 @@ public PublicationContext newPublicationContext(
boolean isRemotePublicationEnabled,
PersistedStateRegistry persistedStateRegistry
) {
AtomicBoolean allNodesRemotePublicationEnabled = new AtomicBoolean();
// validate if repoMetadata is present in the cluster-state before starting remote-publication
if (isRemotePublicationEnabled == true) {
if (allNodesRemotePublicationEnabled.get() == false) {
if (validateRemotePublicationOnAllNodes(clusterChangedEvent.state().nodes()) == true) {
allNodesRemotePublicationEnabled.set(true);
RepositoriesMetadata custom = clusterChangedEvent.previousState().getMetadata().custom(RepositoriesMetadata.TYPE);
String clusterStateRepo = clusterChangedEvent.state()
.nodes()
.getLocalNode()
.getAttributes()
.get(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY);
boolean isRepoInitialized = custom != null && custom.repository(clusterStateRepo) != null;
if (isRepoInitialized) {
if (validateRemotePublicationOnAllNodes(clusterChangedEvent.state().nodes()) == true) {
allNodesRemotePublicationEnabled.set(true);
}
}
}

if (allNodesRemotePublicationEnabled.get() == true) {
// if all nodes are remote then create remote publication context
return new RemotePublicationContext(clusterChangedEvent, persistedStateRegistry);
Expand Down Expand Up @@ -542,7 +560,7 @@ public String executor() {
}

public void sendClusterState(DiscoveryNode destination, ActionListener<PublishWithJoinResponse> listener) {
logger.debug("sending cluster state over transport to node: {}", destination.getName());
logger.info("sending cluster state over transport to node: {}", destination.getName());
if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) {
logger.trace("sending full cluster state version [{}] to [{}]", newState.version(), destination);
sendFullClusterState(destination, listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,28 @@
public class ValidateJoinRequest extends TransportRequest {
private ClusterState state;

boolean isRemoteStateEnabled;

public ValidateJoinRequest(StreamInput in) throws IOException {
super(in);
this.state = ClusterState.readFrom(in, null);
this.isRemoteStateEnabled = in.readBoolean();
}

public ValidateJoinRequest(ClusterState state) {
this.state = state;
}

public ValidateJoinRequest(ClusterState state, boolean isRemoteStateEnabled) {
this.state = state;
this.isRemoteStateEnabled = isRemoteStateEnabled;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
this.state.writeTo(out);
out.writeBoolean(this.isRemoteStateEnabled);
}

public ClusterState getState() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -1042,12 +1043,24 @@ public void close() throws IOException {
}

public void start() {

assert isRemoteStoreClusterStateEnabled(settings) == true : "Remote cluster state is not enabled";
final String remoteStoreRepo = settings.get(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY
);

if (remoteStoreRepo == null) {
return;
}
assert remoteStoreRepo != null : "Remote Cluster State repository is not configured";
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
Repository repository = null;
try {
repository = repositoriesService.get().repository(remoteStoreRepo);
logger.info("found repo");
} catch (RepositoryMissingException missingexp) {
logger.error("exception while publish", missingexp);
return;
}
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
blobStoreRepository = (BlobStoreRepository) repository;
String clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings).value();
Expand Down
Loading

0 comments on commit d615f68

Please sign in to comment.