Skip to content

Commit

Permalink
[Remote Store] Add remote store restore API implementation (opensearc…
Browse files Browse the repository at this point in the history
…h-project#3642)

* Add remote restore API implementation

Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
sachinpkale authored and Sachin Kale committed Sep 1, 2022
1 parent 672eb62 commit 14b84f9
Show file tree
Hide file tree
Showing 22 changed files with 737 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.remotestore.restore;

import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest;
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.ToXContentObject;
import org.opensearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.opensearch.action.ValidateActions.addValidationError;

/**
* Restore remote store request
*
* @opensearch.internal
*/
public class RestoreRemoteStoreRequest extends ClusterManagerNodeRequest<RestoreRemoteStoreRequest> implements ToXContentObject {

private String[] indices = Strings.EMPTY_ARRAY;
private Boolean waitForCompletion;

public RestoreRemoteStoreRequest() {}

public RestoreRemoteStoreRequest(StreamInput in) throws IOException {
super(in);
indices = in.readStringArray();
waitForCompletion = in.readOptionalBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(indices);
out.writeOptionalBoolean(waitForCompletion);
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (indices == null || indices.length == 0) {
validationException = addValidationError("indices are missing", validationException);
}
return validationException;
}

/**
* Sets the list of indices that should be restored from the remote store
* <p>
* The list of indices supports multi-index syntax. For example: "+test*" ,"-test42" will index all indices with
* prefix "test" except index "test42". Aliases are not supported. An empty list or {"_all"} will restore all open
* indices in the cluster.
*
* @param indices list of indices
* @return this request
*/
public RestoreRemoteStoreRequest indices(String... indices) {
this.indices = indices;
return this;
}

/**
* Sets the list of indices that should be restored from the remote store
* <p>
* The list of indices supports multi-index syntax. For example: "+test*" ,"-test42" will index all indices with
* prefix "test" except index "test42". Aliases are not supported. An empty list or {"_all"} will restore all open
* indices in the cluster.
*
* @param indices list of indices
* @return this request
*/
public RestoreRemoteStoreRequest indices(List<String> indices) {
this.indices = indices.toArray(new String[indices.size()]);
return this;
}

/**
* Returns list of indices that should be restored from the remote store
*/
public String[] indices() {
return indices;
}

/**
* If this parameter is set to true the operation will wait for completion of restore process before returning.
*
* @param waitForCompletion if true the operation will wait for completion
* @return this request
*/
public RestoreRemoteStoreRequest waitForCompletion(boolean waitForCompletion) {
this.waitForCompletion = waitForCompletion;
return this;
}

/**
* Returns wait for completion setting
*
* @return true if the operation will wait for completion
*/
public boolean waitForCompletion() {
return waitForCompletion;
}

/**
* Parses restore definition
*
* @param source restore definition
* @return this request
*/
@SuppressWarnings("unchecked")
public RestoreRemoteStoreRequest source(Map<String, Object> source) {
for (Map.Entry<String, Object> entry : source.entrySet()) {
String name = entry.getKey();
if (name.equals("indices")) {
if (entry.getValue() instanceof String) {
indices(Strings.splitStringByCommaToArray((String) entry.getValue()));
} else if (entry.getValue() instanceof ArrayList) {
indices((ArrayList<String>) entry.getValue());
} else {
throw new IllegalArgumentException("malformed indices section, should be an array of strings");
}
} else {
if (IndicesOptions.isIndicesOptions(name) == false) {
throw new IllegalArgumentException("Unknown parameter " + name);
}
}
}
return this;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.startArray("indices");
for (String index : indices) {
builder.value(index);
}
builder.endArray();
builder.endObject();
return builder;
}

@Override
public String getDescription() {
return "remote_store";
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RestoreRemoteStoreRequest that = (RestoreRemoteStoreRequest) o;
return waitForCompletion == that.waitForCompletion && Arrays.equals(indices, that.indices);
}

@Override
public int hashCode() {
int result = Objects.hash(waitForCompletion);
result = 31 * result + Arrays.hashCode(indices);
return result;
}

@Override
public String toString() {
return Strings.toString(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/** Restore Snapshot transport handler. */
package org.opensearch.action.admin.cluster.remotestore.restore;
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.cluster.routing.RecoverySource.LocalShardsRecoverySource;
import org.opensearch.cluster.routing.RecoverySource.PeerRecoverySource;
import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.opensearch.cluster.routing.RecoverySource.RemoteStoreRecoverySource;
import org.opensearch.common.Randomness;
import org.opensearch.common.collect.ImmutableOpenIntMap;
import org.opensearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -444,12 +445,33 @@ public Builder initializeAsRestore(IndexMetadata indexMetadata, SnapshotRecovery
return initializeAsRestore(indexMetadata, recoverySource, null, false, unassignedInfo);
}

/**
* Initializes an existing index, to be restored from remote store
*/
public Builder initializeAsRemoteStoreRestore(IndexMetadata indexMetadata, RemoteStoreRecoverySource recoverySource) {
final UnassignedInfo unassignedInfo = new UnassignedInfo(
UnassignedInfo.Reason.EXISTING_INDEX_RESTORED,
"restore_source[remote_store]"
);
assert indexMetadata.getIndex().equals(index);
if (!shards.isEmpty()) {
throw new IllegalStateException("trying to initialize an index with fresh shards, but already has shards created");
}
for (int shardNumber = 0; shardNumber < indexMetadata.getNumberOfShards(); shardNumber++) {
ShardId shardId = new ShardId(index, shardNumber);
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId);
indexShardRoutingBuilder.addShard(ShardRouting.newUnassigned(shardId, true, recoverySource, unassignedInfo));
shards.put(shardNumber, indexShardRoutingBuilder.build());
}
return this;
}

/**
* Initializes an index, to be restored from snapshot
*/
private Builder initializeAsRestore(
IndexMetadata indexMetadata,
SnapshotRecoverySource recoverySource,
RecoverySource recoverySource,
IntSet ignoreShards,
boolean asNew,
UnassignedInfo unassignedInfo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public static RecoverySource readFrom(StreamInput in) throws IOException {
return new SnapshotRecoverySource(in);
case LOCAL_SHARDS:
return LocalShardsRecoverySource.INSTANCE;
case REMOTE_STORE:
return new RemoteStoreRecoverySource(in);
default:
throw new IllegalArgumentException("unknown recovery type: " + type.name());
}
Expand Down Expand Up @@ -116,7 +118,8 @@ public enum Type {
EXISTING_STORE,
PEER,
SNAPSHOT,
LOCAL_SHARDS
LOCAL_SHARDS,
REMOTE_STORE
}

public abstract Type getType();
Expand Down Expand Up @@ -349,6 +352,97 @@ public int hashCode() {

}

/**
* Recovery from remote store
*
* @opensearch.internal
*/
public static class RemoteStoreRecoverySource extends RecoverySource {

private final String restoreUUID;
private final IndexId index;
private final Version version;

public RemoteStoreRecoverySource(String restoreUUID, Version version, IndexId indexId) {
this.restoreUUID = restoreUUID;
this.version = Objects.requireNonNull(version);
this.index = Objects.requireNonNull(indexId);
}

RemoteStoreRecoverySource(StreamInput in) throws IOException {
restoreUUID = in.readString();
version = Version.readVersion(in);
if (in.getVersion().onOrAfter(LegacyESVersion.V_7_7_0)) {
index = new IndexId(in);
} else {
index = new IndexId(in.readString(), IndexMetadata.INDEX_UUID_NA_VALUE);
}
}

public String restoreUUID() {
return restoreUUID;
}

/**
* Gets the {@link IndexId} of the recovery source. May contain {@link IndexMetadata#INDEX_UUID_NA_VALUE} as the index uuid if it
* was created by an older version cluster-manager in a mixed version cluster.
*
* @return IndexId
*/
public IndexId index() {
return index;
}

public Version version() {
return version;
}

@Override
protected void writeAdditionalFields(StreamOutput out) throws IOException {
out.writeString(restoreUUID);
Version.writeVersion(version, out);
if (out.getVersion().onOrAfter(LegacyESVersion.V_7_7_0)) {
index.writeTo(out);
} else {
out.writeString(index.getName());
}
}

@Override
public Type getType() {
return Type.REMOTE_STORE;
}

@Override
public void addAdditionalFields(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.field("version", version.toString()).field("index", index.getName()).field("restoreUUID", restoreUUID);
}

@Override
public String toString() {
return "remote store recovery [" + restoreUUID + "]";
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

RemoteStoreRecoverySource that = (RemoteStoreRecoverySource) o;
return restoreUUID.equals(that.restoreUUID) && index.equals(that.index) && version.equals(that.version);
}

@Override
public int hashCode() {
return Objects.hash(restoreUUID, index, version);
}

}

/**
* peer recovery from a primary shard
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.opensearch.cluster.routing.RecoverySource.RemoteStoreRecoverySource;
import org.opensearch.common.Nullable;
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -564,6 +565,13 @@ public Builder addAsFromOpenToClose(IndexMetadata indexMetadata) {
return add(indexRoutingBuilder);
}

public Builder addAsRemoteStoreRestore(IndexMetadata indexMetadata, RemoteStoreRecoverySource recoverySource) {
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetadata.getIndex())
.initializeAsRemoteStoreRestore(indexMetadata, recoverySource);
add(indexRoutingBuilder);
return this;
}

public Builder addAsRestore(IndexMetadata indexMetadata, SnapshotRecoverySource recoverySource) {
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetadata.getIndex()).initializeAsRestore(
indexMetadata,
Expand Down
8 changes: 1 addition & 7 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -514,13 +514,7 @@ public synchronized IndexShard createShard(
this.indexSettings,
path
);
remoteStore = new Store(
shardId,
this.indexSettings,
remoteDirectory,
lock,
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId))
);
remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY);
}

Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
Expand Down
Loading

0 comments on commit 14b84f9

Please sign in to comment.