Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Add remote store restore API implementation #3642

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file is added so that corresponding code in RestoreService is compiled without any errors. This file is same as defined here: #3576

* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.remotestore.restore;

import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest;
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.ToXContentObject;
import org.opensearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.opensearch.action.ValidateActions.addValidationError;

/**
* Restore remote store request
*
* @opensearch.internal
*/
public class RestoreRemoteStoreRequest extends ClusterManagerNodeRequest<RestoreRemoteStoreRequest> implements ToXContentObject {

private String[] indices = Strings.EMPTY_ARRAY;
private Boolean waitForCompletion;

public RestoreRemoteStoreRequest() {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't indices be in constructor args(mandatory)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is handled by overriding ActionRequet's validate() method.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant just from a stand alone class perspective

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. It is a bit tricky as it depends on how RestRestoreRemoteStoreAction initializes the RestoreRemoteStoreRequest.

https://github.com/opensearch-project/OpenSearch/pull/3576/files#diff-03a57182daff0959e24e2fccf7fe28e7dedcd711ef76758e24df834dc0e884b7R41

As indices is a part of post body, fetching it from RestRequest requires parsing the body content. This code of parsing is added as part of source method of the same class.

I just followed the conventions used by other Request classes but open to suggestions.


public RestoreRemoteStoreRequest(StreamInput in) throws IOException {
super(in);
indices = in.readStringArray();
waitForCompletion = in.readOptionalBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(indices);
out.writeOptionalBoolean(waitForCompletion);
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (indices == null || indices.length == 0) {
validationException = addValidationError("indices are missing", validationException);
}
return validationException;
}

/**
* Sets the list of indices that should be restored from the remote store
* <p>
* The list of indices supports multi-index syntax. For example: "+test*" ,"-test42" will index all indices with
* prefix "test" except index "test42". Aliases are not supported. An empty list or {"_all"} will restore all open
* indices in the cluster.
*
* @param indices list of indices
* @return this request
*/
public RestoreRemoteStoreRequest indices(String... indices) {
this.indices = indices;
return this;
}

/**
* Sets the list of indices that should be restored from the remote store
* <p>
* The list of indices supports multi-index syntax. For example: "+test*" ,"-test42" will index all indices with
* prefix "test" except index "test42". Aliases are not supported. An empty list or {"_all"} will restore all open
* indices in the cluster.
*
* @param indices list of indices
* @return this request
*/
public RestoreRemoteStoreRequest indices(List<String> indices) {
this.indices = indices.toArray(new String[indices.size()]);
return this;
}

/**
* Returns list of indices that should be restored from the remote store
*/
public String[] indices() {
return indices;
}

/**
* If this parameter is set to true the operation will wait for completion of restore process before returning.
*
* @param waitForCompletion if true the operation will wait for completion
* @return this request
*/
public RestoreRemoteStoreRequest waitForCompletion(boolean waitForCompletion) {
this.waitForCompletion = waitForCompletion;
return this;
}

/**
* Returns wait for completion setting
*
* @return true if the operation will wait for completion
*/
public boolean waitForCompletion() {
return waitForCompletion;
}

/**
* Parses restore definition
*
* @param source restore definition
* @return this request
*/
@SuppressWarnings("unchecked")
public RestoreRemoteStoreRequest source(Map<String, Object> source) {
for (Map.Entry<String, Object> entry : source.entrySet()) {
String name = entry.getKey();
if (name.equals("indices")) {
if (entry.getValue() instanceof String) {
indices(Strings.splitStringByCommaToArray((String) entry.getValue()));
} else if (entry.getValue() instanceof ArrayList) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can it be ArrayList?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As indices is part of post body, it is better to get it as map from the XContentParser. This will help in easily adding new fields to the post body.

indices((ArrayList<String>) entry.getValue());
} else {
throw new IllegalArgumentException("malformed indices section, should be an array of strings");
}
} else {
if (IndicesOptions.isIndicesOptions(name) == false) {
throw new IllegalArgumentException("Unknown parameter " + name);
}
}
}
return this;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.startArray("indices");
for (String index : indices) {
builder.value(index);
}
builder.endArray();
builder.endObject();
return builder;
}

@Override
public String getDescription() {
return "remote_store";
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RestoreRemoteStoreRequest that = (RestoreRemoteStoreRequest) o;
return waitForCompletion == that.waitForCompletion && Arrays.equals(indices, that.indices);
}

@Override
public int hashCode() {
int result = Objects.hash(waitForCompletion);
result = 31 * result + Arrays.hashCode(indices);
return result;
}

@Override
public String toString() {
return Strings.toString(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/** Restore Snapshot transport handler. */
package org.opensearch.action.admin.cluster.remotestore.restore;
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.cluster.routing.RecoverySource.LocalShardsRecoverySource;
import org.opensearch.cluster.routing.RecoverySource.PeerRecoverySource;
import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.opensearch.cluster.routing.RecoverySource.RemoteStoreRecoverySource;
import org.opensearch.common.Randomness;
import org.opensearch.common.collect.ImmutableOpenIntMap;
import org.opensearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -444,12 +445,33 @@ public Builder initializeAsRestore(IndexMetadata indexMetadata, SnapshotRecovery
return initializeAsRestore(indexMetadata, recoverySource, null, false, unassignedInfo);
}

/**
* Initializes an existing index, to be restored from remote store
*/
public Builder initializeAsRemoteStoreRestore(IndexMetadata indexMetadata, RemoteStoreRecoverySource recoverySource) {
final UnassignedInfo unassignedInfo = new UnassignedInfo(
UnassignedInfo.Reason.EXISTING_INDEX_RESTORED,
"restore_source[remote_store]"
);
assert indexMetadata.getIndex().equals(index);
if (!shards.isEmpty()) {
throw new IllegalStateException("trying to initialize an index with fresh shards, but already has shards created");
}
for (int shardNumber = 0; shardNumber < indexMetadata.getNumberOfShards(); shardNumber++) {
ShardId shardId = new ShardId(index, shardNumber);
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId);
indexShardRoutingBuilder.addShard(ShardRouting.newUnassigned(shardId, true, recoverySource, unassignedInfo));
shards.put(shardNumber, indexShardRoutingBuilder.build());
}
return this;
}

/**
* Initializes an index, to be restored from snapshot
*/
private Builder initializeAsRestore(
IndexMetadata indexMetadata,
SnapshotRecoverySource recoverySource,
RecoverySource recoverySource,
IntSet ignoreShards,
boolean asNew,
UnassignedInfo unassignedInfo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public static RecoverySource readFrom(StreamInput in) throws IOException {
return new SnapshotRecoverySource(in);
case LOCAL_SHARDS:
return LocalShardsRecoverySource.INSTANCE;
case REMOTE_STORE:
return new RemoteStoreRecoverySource(in);
default:
throw new IllegalArgumentException("unknown recovery type: " + type.name());
}
Expand Down Expand Up @@ -116,7 +118,8 @@ public enum Type {
EXISTING_STORE,
PEER,
SNAPSHOT,
LOCAL_SHARDS
LOCAL_SHARDS,
REMOTE_STORE
}

public abstract Type getType();
Expand Down Expand Up @@ -349,6 +352,97 @@ public int hashCode() {

}

/**
* Recovery from remote store
*
* @opensearch.internal
*/
public static class RemoteStoreRecoverySource extends RecoverySource {

private final String restoreUUID;
private final IndexId index;
private final Version version;

public RemoteStoreRecoverySource(String restoreUUID, Version version, IndexId indexId) {
this.restoreUUID = restoreUUID;
this.version = Objects.requireNonNull(version);
this.index = Objects.requireNonNull(indexId);
}

RemoteStoreRecoverySource(StreamInput in) throws IOException {
restoreUUID = in.readString();
version = Version.readVersion(in);
if (in.getVersion().onOrAfter(LegacyESVersion.V_7_7_0)) {
index = new IndexId(in);
} else {
index = new IndexId(in.readString(), IndexMetadata.INDEX_UUID_NA_VALUE);
}
}

public String restoreUUID() {
return restoreUUID;
}

/**
* Gets the {@link IndexId} of the recovery source. May contain {@link IndexMetadata#INDEX_UUID_NA_VALUE} as the index uuid if it
* was created by an older version cluster-manager in a mixed version cluster.
*
* @return IndexId
*/
public IndexId index() {
return index;
}

public Version version() {
return version;
}

@Override
protected void writeAdditionalFields(StreamOutput out) throws IOException {
out.writeString(restoreUUID);
Version.writeVersion(version, out);
if (out.getVersion().onOrAfter(LegacyESVersion.V_7_7_0)) {
index.writeTo(out);
} else {
out.writeString(index.getName());
}
}

@Override
public Type getType() {
return Type.REMOTE_STORE;
}

@Override
public void addAdditionalFields(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.field("version", version.toString()).field("index", index.getName()).field("restoreUUID", restoreUUID);
}

@Override
public String toString() {
return "remote store recovery [" + restoreUUID + "]";
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

RemoteStoreRecoverySource that = (RemoteStoreRecoverySource) o;
return restoreUUID.equals(that.restoreUUID) && index.equals(that.index) && version.equals(that.version);
}

@Override
public int hashCode() {
return Objects.hash(restoreUUID, index, version);
}

}

/**
* peer recovery from a primary shard
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.opensearch.cluster.routing.RecoverySource.RemoteStoreRecoverySource;
import org.opensearch.common.Nullable;
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -564,6 +565,13 @@ public Builder addAsFromOpenToClose(IndexMetadata indexMetadata) {
return add(indexRoutingBuilder);
}

public Builder addAsRemoteStoreRestore(IndexMetadata indexMetadata, RemoteStoreRecoverySource recoverySource) {
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetadata.getIndex())
.initializeAsRemoteStoreRestore(indexMetadata, recoverySource);
add(indexRoutingBuilder);
return this;
}

public Builder addAsRestore(IndexMetadata indexMetadata, SnapshotRecoverySource recoverySource) {
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetadata.getIndex()).initializeAsRestore(
indexMetadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,13 +514,7 @@ public synchronized IndexShard createShard(
this.indexSettings,
path
);
remoteStore = new Store(
shardId,
this.indexSettings,
remoteDirectory,
lock,
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId))
);
remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY);
}

Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
Expand Down
Loading