From 14b84f9442730dc206f8c3a355fe479e6d4b99f1 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Tue, 5 Jul 2022 22:49:30 +0530 Subject: [PATCH] [Remote Store] Add remote store restore API implementation (#3642) * Add remote restore API implementation Signed-off-by: Sachin Kale --- .../restore/RestoreRemoteStoreRequest.java | 183 ++++++++++++++++++ .../remotestore/restore/package-info.java | 10 + .../cluster/routing/IndexRoutingTable.java | 24 ++- .../cluster/routing/RecoverySource.java | 96 ++++++++- .../cluster/routing/RoutingTable.java | 8 + .../org/opensearch/index/IndexService.java | 8 +- .../opensearch/index/shard/IndexShard.java | 9 + .../opensearch/index/shard/StoreRecovery.java | 57 ++++++ .../index/store/RemoteIndexInput.java | 7 +- .../opensearch/snapshots/RestoreService.java | 91 +++++++++ .../RestoreRemoteStoreRequestTests.java | 83 ++++++++ .../cluster/routing/RecoverySourceTests.java | 20 +- .../cluster/routing/RoutingTableTests.java | 17 ++ .../index/engine/NoOpEngineRecoveryTests.java | 3 +- .../index/shard/IndexShardTests.java | 98 ++++++++-- .../RemoveCorruptedShardDataCommandTests.java | 6 +- .../index/store/RemoteIndexInputTests.java | 13 ++ .../PeerRecoveryTargetServiceTests.java | 3 +- .../BlobStoreRepositoryRestoreTests.java | 3 +- .../cluster/routing/ShardRoutingHelper.java | 4 +- ...enSearchIndexLevelReplicationTestCase.java | 8 +- .../index/shard/IndexShardTestCase.java | 24 ++- 22 files changed, 737 insertions(+), 38 deletions(-) create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreRequest.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/package-info.java create mode 100644 server/src/test/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreRequestTests.java diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreRequest.java new file mode 100644 index 0000000000000..a77477f24aa9f --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreRequest.java @@ -0,0 +1,183 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.remotestore.restore; + +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.support.IndicesOptions; +import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest; +import org.opensearch.common.Strings; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.ToXContentObject; +import org.opensearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.opensearch.action.ValidateActions.addValidationError; + +/** + * Restore remote store request + * + * @opensearch.internal + */ +public class RestoreRemoteStoreRequest extends ClusterManagerNodeRequest implements ToXContentObject { + + private String[] indices = Strings.EMPTY_ARRAY; + private Boolean waitForCompletion; + + public RestoreRemoteStoreRequest() {} + + public RestoreRemoteStoreRequest(StreamInput in) throws IOException { + super(in); + indices = in.readStringArray(); + waitForCompletion = in.readOptionalBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(indices); + out.writeOptionalBoolean(waitForCompletion); + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (indices == null || indices.length == 0) { + validationException = addValidationError("indices are missing", validationException); + } + return validationException; + } + + /** + * Sets the list of indices that should be restored from the remote store + *

+ * The list of indices supports multi-index syntax. For example: "+test*" ,"-test42" will index all indices with + * prefix "test" except index "test42". Aliases are not supported. An empty list or {"_all"} will restore all open + * indices in the cluster. + * + * @param indices list of indices + * @return this request + */ + public RestoreRemoteStoreRequest indices(String... indices) { + this.indices = indices; + return this; + } + + /** + * Sets the list of indices that should be restored from the remote store + *

+ * The list of indices supports multi-index syntax. For example: "+test*" ,"-test42" will index all indices with + * prefix "test" except index "test42". Aliases are not supported. An empty list or {"_all"} will restore all open + * indices in the cluster. + * + * @param indices list of indices + * @return this request + */ + public RestoreRemoteStoreRequest indices(List indices) { + this.indices = indices.toArray(new String[indices.size()]); + return this; + } + + /** + * Returns list of indices that should be restored from the remote store + */ + public String[] indices() { + return indices; + } + + /** + * If this parameter is set to true the operation will wait for completion of restore process before returning. + * + * @param waitForCompletion if true the operation will wait for completion + * @return this request + */ + public RestoreRemoteStoreRequest waitForCompletion(boolean waitForCompletion) { + this.waitForCompletion = waitForCompletion; + return this; + } + + /** + * Returns wait for completion setting + * + * @return true if the operation will wait for completion + */ + public boolean waitForCompletion() { + return waitForCompletion; + } + + /** + * Parses restore definition + * + * @param source restore definition + * @return this request + */ + @SuppressWarnings("unchecked") + public RestoreRemoteStoreRequest source(Map source) { + for (Map.Entry entry : source.entrySet()) { + String name = entry.getKey(); + if (name.equals("indices")) { + if (entry.getValue() instanceof String) { + indices(Strings.splitStringByCommaToArray((String) entry.getValue())); + } else if (entry.getValue() instanceof ArrayList) { + indices((ArrayList) entry.getValue()); + } else { + throw new IllegalArgumentException("malformed indices section, should be an array of strings"); + } + } else { + if (IndicesOptions.isIndicesOptions(name) == false) { + throw new IllegalArgumentException("Unknown parameter " + name); + } + } + } + return this; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.startArray("indices"); + for (String index : indices) { + builder.value(index); + } + builder.endArray(); + builder.endObject(); + return builder; + } + + @Override + public String getDescription() { + return "remote_store"; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + RestoreRemoteStoreRequest that = (RestoreRemoteStoreRequest) o; + return waitForCompletion == that.waitForCompletion && Arrays.equals(indices, that.indices); + } + + @Override + public int hashCode() { + int result = Objects.hash(waitForCompletion); + result = 31 * result + Arrays.hashCode(indices); + return result; + } + + @Override + public String toString() { + return Strings.toString(this); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/package-info.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/package-info.java new file mode 100644 index 0000000000000..363b7179f3c6c --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Restore Snapshot transport handler. */ +package org.opensearch.action.admin.cluster.remotestore.restore; diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java index 9eca838bb7945..9463f9ff0a422 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java @@ -45,6 +45,7 @@ import org.opensearch.cluster.routing.RecoverySource.LocalShardsRecoverySource; import org.opensearch.cluster.routing.RecoverySource.PeerRecoverySource; import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource; +import org.opensearch.cluster.routing.RecoverySource.RemoteStoreRecoverySource; import org.opensearch.common.Randomness; import org.opensearch.common.collect.ImmutableOpenIntMap; import org.opensearch.common.io.stream.StreamInput; @@ -444,12 +445,33 @@ public Builder initializeAsRestore(IndexMetadata indexMetadata, SnapshotRecovery return initializeAsRestore(indexMetadata, recoverySource, null, false, unassignedInfo); } + /** + * Initializes an existing index, to be restored from remote store + */ + public Builder initializeAsRemoteStoreRestore(IndexMetadata indexMetadata, RemoteStoreRecoverySource recoverySource) { + final UnassignedInfo unassignedInfo = new UnassignedInfo( + UnassignedInfo.Reason.EXISTING_INDEX_RESTORED, + "restore_source[remote_store]" + ); + assert indexMetadata.getIndex().equals(index); + if (!shards.isEmpty()) { + throw new IllegalStateException("trying to initialize an index with fresh shards, but already has shards created"); + } + for (int shardNumber = 0; shardNumber < indexMetadata.getNumberOfShards(); shardNumber++) { + ShardId shardId = new ShardId(index, shardNumber); + IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId); + indexShardRoutingBuilder.addShard(ShardRouting.newUnassigned(shardId, true, recoverySource, unassignedInfo)); + shards.put(shardNumber, indexShardRoutingBuilder.build()); + } + return this; + } + /** * Initializes an index, to be restored from snapshot */ private Builder initializeAsRestore( IndexMetadata indexMetadata, - SnapshotRecoverySource recoverySource, + RecoverySource recoverySource, IntSet ignoreShards, boolean asNew, UnassignedInfo unassignedInfo diff --git a/server/src/main/java/org/opensearch/cluster/routing/RecoverySource.java b/server/src/main/java/org/opensearch/cluster/routing/RecoverySource.java index 6ffa155729c7a..539773296ed74 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RecoverySource.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RecoverySource.java @@ -88,6 +88,8 @@ public static RecoverySource readFrom(StreamInput in) throws IOException { return new SnapshotRecoverySource(in); case LOCAL_SHARDS: return LocalShardsRecoverySource.INSTANCE; + case REMOTE_STORE: + return new RemoteStoreRecoverySource(in); default: throw new IllegalArgumentException("unknown recovery type: " + type.name()); } @@ -116,7 +118,8 @@ public enum Type { EXISTING_STORE, PEER, SNAPSHOT, - LOCAL_SHARDS + LOCAL_SHARDS, + REMOTE_STORE } public abstract Type getType(); @@ -349,6 +352,97 @@ public int hashCode() { } + /** + * Recovery from remote store + * + * @opensearch.internal + */ + public static class RemoteStoreRecoverySource extends RecoverySource { + + private final String restoreUUID; + private final IndexId index; + private final Version version; + + public RemoteStoreRecoverySource(String restoreUUID, Version version, IndexId indexId) { + this.restoreUUID = restoreUUID; + this.version = Objects.requireNonNull(version); + this.index = Objects.requireNonNull(indexId); + } + + RemoteStoreRecoverySource(StreamInput in) throws IOException { + restoreUUID = in.readString(); + version = Version.readVersion(in); + if (in.getVersion().onOrAfter(LegacyESVersion.V_7_7_0)) { + index = new IndexId(in); + } else { + index = new IndexId(in.readString(), IndexMetadata.INDEX_UUID_NA_VALUE); + } + } + + public String restoreUUID() { + return restoreUUID; + } + + /** + * Gets the {@link IndexId} of the recovery source. May contain {@link IndexMetadata#INDEX_UUID_NA_VALUE} as the index uuid if it + * was created by an older version cluster-manager in a mixed version cluster. + * + * @return IndexId + */ + public IndexId index() { + return index; + } + + public Version version() { + return version; + } + + @Override + protected void writeAdditionalFields(StreamOutput out) throws IOException { + out.writeString(restoreUUID); + Version.writeVersion(version, out); + if (out.getVersion().onOrAfter(LegacyESVersion.V_7_7_0)) { + index.writeTo(out); + } else { + out.writeString(index.getName()); + } + } + + @Override + public Type getType() { + return Type.REMOTE_STORE; + } + + @Override + public void addAdditionalFields(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.field("version", version.toString()).field("index", index.getName()).field("restoreUUID", restoreUUID); + } + + @Override + public String toString() { + return "remote store recovery [" + restoreUUID + "]"; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + RemoteStoreRecoverySource that = (RemoteStoreRecoverySource) o; + return restoreUUID.equals(that.restoreUUID) && index.equals(that.index) && version.equals(that.version); + } + + @Override + public int hashCode() { + return Objects.hash(restoreUUID, index, version); + } + + } + /** * peer recovery from a primary shard * diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java index 8c2d3ddb0697f..15bb997bfb05a 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java @@ -41,6 +41,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource; +import org.opensearch.cluster.routing.RecoverySource.RemoteStoreRecoverySource; import org.opensearch.common.Nullable; import org.opensearch.common.collect.ImmutableOpenMap; import org.opensearch.common.io.stream.StreamInput; @@ -564,6 +565,13 @@ public Builder addAsFromOpenToClose(IndexMetadata indexMetadata) { return add(indexRoutingBuilder); } + public Builder addAsRemoteStoreRestore(IndexMetadata indexMetadata, RemoteStoreRecoverySource recoverySource) { + IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetadata.getIndex()) + .initializeAsRemoteStoreRestore(indexMetadata, recoverySource); + add(indexRoutingBuilder); + return this; + } + public Builder addAsRestore(IndexMetadata indexMetadata, SnapshotRecoverySource recoverySource) { IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetadata.getIndex()).initializeAsRestore( indexMetadata, diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 6ed3fffc191a1..7c3675fab423c 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -514,13 +514,7 @@ public synchronized IndexShard createShard( this.indexSettings, path ); - remoteStore = new Store( - shardId, - this.indexSettings, - remoteDirectory, - lock, - new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId)) - ); + remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY); } Directory directory = directoryFactory.newDirectory(this.indexSettings, path); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 19eb92d2962e4..3de4c460b744e 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2243,6 +2243,12 @@ public void recoverFromStore(ActionListener listener) { storeRecovery.recoverFromStore(this, listener); } + public void restoreFromRemoteStore(ActionListener listener) { + assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard"; + StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); + storeRecovery.recoverFromRemoteStore(this, listener); + } + public void restoreFromRepository(Repository repository, ActionListener listener) { try { assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard"; @@ -3029,6 +3035,9 @@ public void startRecovery( case EXISTING_STORE: executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore); break; + case REMOTE_STORE: + executeRecovery("from remote store", recoveryState, recoveryListener, this::restoreFromRemoteStore); + break; case PEER: try { markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState); diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index 0499140237dbd..1190e8e6ab3d2 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -117,6 +117,20 @@ void recoverFromStore(final IndexShard indexShard, ActionListener liste } } + void recoverFromRemoteStore(final IndexShard indexShard, ActionListener listener) { + if (canRecover(indexShard)) { + RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType(); + assert recoveryType == RecoverySource.Type.REMOTE_STORE : "expected remote store recovery type but was: " + recoveryType; + ActionListener.completeWith(recoveryListener(indexShard, listener), () -> { + logger.debug("starting recovery from remote store ..."); + recoverFromRemoteStore(indexShard); + return true; + }); + } else { + listener.onResponse(false); + } + } + void recoverFromLocalShards( Consumer mappingUpdateConsumer, IndexShard indexShard, @@ -424,6 +438,49 @@ private ActionListener recoveryListener(IndexShard indexShard, ActionLi }); } + private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardRecoveryException { + final Store remoteStore = indexShard.remoteStore(); + if (remoteStore == null) { + throw new IndexShardRecoveryException( + indexShard.shardId(), + "Remote store is not enabled for this index", + new IllegalArgumentException() + ); + } + indexShard.preRecovery(); + indexShard.prepareForIndexRecovery(); + final Directory remoteDirectory = remoteStore.directory(); + final Store store = indexShard.store(); + final Directory storeDirectory = store.directory(); + store.incRef(); + remoteStore.incRef(); + try { + // Cleaning up local directory before copying file from remote directory. + // This is done to make sure we start with clean slate. + // ToDo: Check if we can copy only missing files + for (String file : storeDirectory.listAll()) { + storeDirectory.deleteFile(file); + } + for (String file : remoteDirectory.listAll()) { + storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT); + } + // This creates empty trans-log for now + // ToDo: Add code to restore from remote trans-log + bootstrap(indexShard, store); + assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; + indexShard.recoveryState().getIndex().setFileDetailsComplete(); + indexShard.openEngineAndRecoverFromTranslog(); + indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); + indexShard.finalizeRecovery(); + indexShard.postRecovery("post recovery from remote_store"); + } catch (IOException e) { + throw new IndexShardRecoveryException(indexShard.shardId, "Exception while recovering from remote store", e); + } finally { + store.decRef(); + remoteStore.decRef(); + } + } + /** * Recovers the state of the shard from the store. */ diff --git a/server/src/main/java/org/opensearch/index/store/RemoteIndexInput.java b/server/src/main/java/org/opensearch/index/store/RemoteIndexInput.java index 24e1128dec1b5..8f8d5dd5418ae 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteIndexInput.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteIndexInput.java @@ -43,7 +43,12 @@ public byte readByte() throws IOException { @Override public void readBytes(byte[] b, int offset, int len) throws IOException { - inputStream.read(b, offset, len); + int bytesRead = inputStream.read(b, offset, len); + while (bytesRead > 0 && bytesRead < len) { + len -= bytesRead; + offset += bytesRead; + bytesRead = inputStream.read(b, offset, len); + } } @Override diff --git a/server/src/main/java/org/opensearch/snapshots/RestoreService.java b/server/src/main/java/org/opensearch/snapshots/RestoreService.java index 0b32621c716ca..a14de23d81d09 100644 --- a/server/src/main/java/org/opensearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/opensearch/snapshots/RestoreService.java @@ -42,6 +42,7 @@ import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.action.StepListener; +import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest; import org.opensearch.action.support.IndicesOptions; import org.opensearch.cluster.ClusterChangedEvent; @@ -68,6 +69,7 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource; +import org.opensearch.cluster.routing.RecoverySource.RemoteStoreRecoverySource; import org.opensearch.cluster.routing.RoutingChangesObserver; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.ShardRouting; @@ -114,6 +116,7 @@ import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_VERSION_UPGRADED; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE; import static org.opensearch.common.util.set.Sets.newHashSet; import static org.opensearch.snapshots.SnapshotUtils.filterIndices; @@ -196,6 +199,94 @@ public RestoreService( this.shardLimitValidator = shardLimitValidator; } + /** + * Restores data from remote store for indices specified in the restore request. + * + * @param request restore request + * @param listener restore listener + */ + public void restoreFromRemoteStore(RestoreRemoteStoreRequest request, final ActionListener listener) { + clusterService.submitStateUpdateTask("restore[remote_store]", new ClusterStateUpdateTask() { + final String restoreUUID = UUIDs.randomBase64UUID(); + RestoreInfo restoreInfo = null; + + @Override + public ClusterState execute(ClusterState currentState) { + // Updating cluster state + ClusterState.Builder builder = ClusterState.builder(currentState); + Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata()); + ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); + RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable()); + + List indicesToBeRestored = new ArrayList<>(); + int totalShards = 0; + for (String index : request.indices()) { + IndexMetadata currentIndexMetadata = currentState.metadata().index(index); + if (currentIndexMetadata == null) { + // ToDo: Handle index metadata does not exist case. (GitHub #3457) + logger.warn("Remote store restore is not supported for non-existent index. Skipping: {}", index); + continue; + } + if (currentIndexMetadata.getSettings().getAsBoolean(SETTING_REMOTE_STORE, false)) { + if (currentIndexMetadata.getState() != IndexMetadata.State.CLOSE) { + throw new IllegalStateException( + "cannot restore index [" + + index + + "] because an open index " + + "with same name already exists in the cluster. Close the existing index" + ); + } + IndexMetadata updatedIndexMetadata = IndexMetadata.builder(currentIndexMetadata) + .state(IndexMetadata.State.OPEN) + .version(1 + currentIndexMetadata.getVersion()) + .mappingVersion(1 + currentIndexMetadata.getMappingVersion()) + .settingsVersion(1 + currentIndexMetadata.getSettingsVersion()) + .aliasesVersion(1 + currentIndexMetadata.getAliasesVersion()) + .build(); + + IndexId indexId = new IndexId(index, updatedIndexMetadata.getIndexUUID()); + + RemoteStoreRecoverySource recoverySource = new RemoteStoreRecoverySource( + restoreUUID, + updatedIndexMetadata.getCreationVersion(), + indexId + ); + rtBuilder.addAsRemoteStoreRestore(updatedIndexMetadata, recoverySource); + blocks.updateBlocks(updatedIndexMetadata); + mdBuilder.put(updatedIndexMetadata, true); + indicesToBeRestored.add(index); + totalShards += updatedIndexMetadata.getNumberOfShards(); + } else { + logger.warn("Remote store is not enabled for index: {}", index); + } + } + + restoreInfo = new RestoreInfo("remote_store", indicesToBeRestored, totalShards, totalShards); + + RoutingTable rt = rtBuilder.build(); + ClusterState updatedState = builder.metadata(mdBuilder).blocks(blocks).routingTable(rt).build(); + return allocationService.reroute(updatedState, "restored from remote store"); + } + + @Override + public void onFailure(String source, Exception e) { + logger.warn("failed to restore from remote store", e); + listener.onFailure(e); + } + + @Override + public TimeValue timeout() { + return request.masterNodeTimeout(); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(new RestoreCompletionResponse(restoreUUID, null, restoreInfo)); + } + }); + + } + /** * Restores snapshot specified in the restore request. * diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreRequestTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreRequestTests.java new file mode 100644 index 0000000000000..0941c9a1c61d0 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreRequestTests.java @@ -0,0 +1,83 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.remotestore.restore; + +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.xcontent.ToXContent; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.common.xcontent.XContentParser; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.common.xcontent.NamedXContentRegistry; +import org.opensearch.test.AbstractWireSerializingTestCase; + +import java.io.IOException; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.Collections; + +public class RestoreRemoteStoreRequestTests extends AbstractWireSerializingTestCase { + private RestoreRemoteStoreRequest randomState(RestoreRemoteStoreRequest instance) { + if (randomBoolean()) { + List indices = new ArrayList<>(); + int count = randomInt(3) + 1; + + for (int i = 0; i < count; ++i) { + indices.add(randomAlphaOfLength(randomInt(3) + 2)); + } + + instance.indices(indices); + } + + instance.waitForCompletion(randomBoolean()); + + if (randomBoolean()) { + instance.masterNodeTimeout(randomTimeValue()); + } + + return instance; + } + + @Override + protected RestoreRemoteStoreRequest createTestInstance() { + return randomState(new RestoreRemoteStoreRequest()); + } + + @Override + protected Writeable.Reader instanceReader() { + return RestoreRemoteStoreRequest::new; + } + + @Override + protected RestoreRemoteStoreRequest mutateInstance(RestoreRemoteStoreRequest instance) throws IOException { + RestoreRemoteStoreRequest copy = copyInstance(instance); + // ensure that at least one property is different + List indices = new ArrayList<>(List.of(instance.indices())); + indices.add("copied"); + copy.indices(indices); + return randomState(copy); + } + + public void testSource() throws IOException { + RestoreRemoteStoreRequest original = createTestInstance(); + XContentBuilder builder = original.toXContent(XContentFactory.jsonBuilder(), new ToXContent.MapParams(Collections.emptyMap())); + XContentParser parser = XContentType.JSON.xContent() + .createParser(NamedXContentRegistry.EMPTY, null, BytesReference.bytes(builder).streamInput()); + Map map = parser.mapOrdered(); + + RestoreRemoteStoreRequest processed = new RestoreRemoteStoreRequest(); + processed.masterNodeTimeout(original.masterNodeTimeout()); + processed.waitForCompletion(original.waitForCompletion()); + processed.source(map); + + assertEquals(original, processed); + } +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/RecoverySourceTests.java b/server/src/test/java/org/opensearch/cluster/routing/RecoverySourceTests.java index 45dafe08d926c..e4aae52f41e68 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/RecoverySourceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/RecoverySourceTests.java @@ -32,7 +32,10 @@ package org.opensearch.cluster.routing; +import org.opensearch.Version; +import org.opensearch.common.UUIDs; import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.repositories.IndexId; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; @@ -57,10 +60,25 @@ public void testRecoverySourceTypeOrder() { assertEquals(RecoverySource.Type.PEER.ordinal(), 2); assertEquals(RecoverySource.Type.SNAPSHOT.ordinal(), 3); assertEquals(RecoverySource.Type.LOCAL_SHARDS.ordinal(), 4); + assertEquals(RecoverySource.Type.REMOTE_STORE.ordinal(), 5); // check exhaustiveness for (RecoverySource.Type type : RecoverySource.Type.values()) { assertThat(type.ordinal(), greaterThanOrEqualTo(0)); - assertThat(type.ordinal(), lessThanOrEqualTo(4)); + assertThat(type.ordinal(), lessThanOrEqualTo(5)); } } + + public void testSerializationRemoteStoreRecoverySource() throws IOException { + RecoverySource recoverySource = new RecoverySource.RemoteStoreRecoverySource( + UUIDs.randomBase64UUID(), + Version.CURRENT, + new IndexId("some_index", UUIDs.randomBase64UUID(random())) + ); + + BytesStreamOutput out = new BytesStreamOutput(); + recoverySource.writeTo(out); + RecoverySource serializedRecoverySource = RecoverySource.readFrom(out.bytes().streamInput()); + assertEquals(recoverySource.getType(), serializedRecoverySource.getType()); + assertEquals(recoverySource, serializedRecoverySource); + } } diff --git a/server/src/test/java/org/opensearch/cluster/routing/RoutingTableTests.java b/server/src/test/java/org/opensearch/cluster/routing/RoutingTableTests.java index cc151b2cae086..133a334ee2c20 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/RoutingTableTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/RoutingTableTests.java @@ -60,6 +60,9 @@ import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; +import org.opensearch.cluster.routing.RecoverySource.RemoteStoreRecoverySource; +import org.opensearch.repositories.IndexId; + public class RoutingTableTests extends OpenSearchAllocationTestCase { private static final String TEST_INDEX_1 = "test1"; @@ -492,6 +495,20 @@ public void testAddAsRecovery() { } } + public void testAddAsRemoteStoreRestore() { + final IndexMetadata indexMetadata = createIndexMetadata(TEST_INDEX_1).state(IndexMetadata.State.OPEN).build(); + final RemoteStoreRecoverySource remoteStoreRecoverySource = new RemoteStoreRecoverySource( + "restore_uuid", + Version.CURRENT, + new IndexId(TEST_INDEX_1, "1") + ); + final RoutingTable routingTable = new RoutingTable.Builder().addAsRemoteStoreRestore(indexMetadata, remoteStoreRecoverySource) + .build(); + assertTrue(routingTable.hasIndex(TEST_INDEX_1)); + assertEquals(this.numberOfShards, routingTable.allShards(TEST_INDEX_1).size()); + assertEquals(this.numberOfShards, routingTable.index(TEST_INDEX_1).shardsWithState(UNASSIGNED).size()); + } + /** reverse engineer the in sync aid based on the given indexRoutingTable **/ public static IndexMetadata updateActiveAllocations(IndexRoutingTable indexRoutingTable, IndexMetadata indexMetadata) { IndexMetadata.Builder imdBuilder = IndexMetadata.builder(indexMetadata); diff --git a/server/src/test/java/org/opensearch/index/engine/NoOpEngineRecoveryTests.java b/server/src/test/java/org/opensearch/index/engine/NoOpEngineRecoveryTests.java index 4a895e1efc152..3162f7915c994 100644 --- a/server/src/test/java/org/opensearch/index/engine/NoOpEngineRecoveryTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NoOpEngineRecoveryTests.java @@ -58,7 +58,8 @@ public void testRecoverFromNoOp() throws IOException { initWithSameId(shardRouting, ExistingStoreRecoverySource.INSTANCE), indexShard.indexSettings().getIndexMetadata(), NoOpEngine::new, - new EngineConfigFactory(indexShard.indexSettings()) + new EngineConfigFactory(indexShard.indexSettings()), + null ); recoverShardFromStore(primary); assertEquals(primary.seqNoStats().getMaxSeqNo(), primary.getMaxSeqNoOfUpdatesOrDeletes()); diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index a3a49e9e30564..5871ff64d0c1d 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -44,6 +44,8 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; +import org.apache.lucene.tests.mockfile.ExtrasFS; +import org.apache.lucene.tests.store.BaseDirectoryWrapper; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.Constants; import org.junit.Assert; @@ -1216,7 +1218,8 @@ public void testGlobalCheckpointSync() throws IOException { null, new InternalEngineFactory(), () -> synced.set(true), - RetentionLeaseSyncer.EMPTY + RetentionLeaseSyncer.EMPTY, + null ); // add a replica recoverShardFromStore(primaryShard); @@ -1290,7 +1293,8 @@ public void testClosedIndicesSkipSyncGlobalCheckpoint() throws Exception { null, new InternalEngineFactory(), () -> synced.set(true), - RetentionLeaseSyncer.EMPTY + RetentionLeaseSyncer.EMPTY, + null ); recoverShardFromStore(primaryShard); IndexShard replicaShard = newShard(shardId, false); @@ -1700,7 +1704,8 @@ public Set getPendingDeletions() throws IOException { new EngineConfigFactory(new IndexSettings(metadata, metadata.getSettings())), () -> {}, RetentionLeaseSyncer.EMPTY, - EMPTY_EVENT_LISTENER + EMPTY_EVENT_LISTENER, + null ); AtomicBoolean failureCallbackTriggered = new AtomicBoolean(false); shard.addShardFailureCallback((ig) -> failureCallbackTriggered.set(true)); @@ -2548,7 +2553,8 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception { shard.getEngineConfigFactory(), shard.getGlobalCheckpointSyncer(), shard.getRetentionLeaseSyncer(), - EMPTY_EVENT_LISTENER + EMPTY_EVENT_LISTENER, + null ); DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); @@ -2650,6 +2656,66 @@ public void restoreShard( closeShards(target); } + public void testRestoreShardFromRemoteStore() throws IOException { + IndexShard target = newStartedShard( + true, + Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE, true).build(), + new InternalEngineFactory() + ); + + indexDoc(target, "_doc", "1"); + indexDoc(target, "_doc", "2"); + target.refresh("test"); + assertDocs(target, "1", "2"); + flushShard(target); + + ShardRouting routing = ShardRoutingHelper.initWithSameId( + target.routingEntry(), + RecoverySource.ExistingStoreRecoverySource.INSTANCE + ); + routing = ShardRoutingHelper.newWithRestoreSource( + routing, + new RecoverySource.RemoteStoreRecoverySource( + UUIDs.randomBase64UUID(), + Version.CURRENT, + new IndexId("test", UUIDs.randomBase64UUID(random())) + ) + ); + + // Delete files in store directory to restore from remote directory + Directory storeDirectory = target.store().directory(); + for (String file : storeDirectory.listAll()) { + storeDirectory.deleteFile(file); + } + + Directory remoteDirectory = ((FilterDirectory) ((FilterDirectory) target.remoteStore().directory()).getDelegate()).getDelegate(); + ((BaseDirectoryWrapper) remoteDirectory).setCheckIndexOnClose(false); + + // extra0 file is added as a part of https://lucene.apache.org/core/7_2_1/test-framework/org/apache/lucene/mockfile/ExtrasFS.html + // Safe to remove without impacting the test + for (String file : remoteDirectory.listAll()) { + if (ExtrasFS.isExtra(file)) { + remoteDirectory.deleteFile(file); + } + } + + target.remoteStore().incRef(); + target = reinitShard(target, routing); + + DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + target.markAsRecovering("remote_store", new RecoveryState(routing, localNode, null)); + final PlainActionFuture future = PlainActionFuture.newFuture(); + target.restoreFromRemoteStore(future); + target.remoteStore().decRef(); + + assertTrue(future.actionGet()); + assertDocs(target, "1", "2"); + + storeDirectory = ((FilterDirectory) ((FilterDirectory) target.store().directory()).getDelegate()).getDelegate(); + ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); + closeShards(target); + } + public void testReaderWrapperIsUsed() throws IOException { IndexShard shard = newStartedShard(true); indexDoc(shard, "_doc", "0", "{\"foo\" : \"bar\"}"); @@ -2678,7 +2744,8 @@ public void testReaderWrapperIsUsed() throws IOException { shard.getEngineConfigFactory(), () -> {}, RetentionLeaseSyncer.EMPTY, - EMPTY_EVENT_LISTENER + EMPTY_EVENT_LISTENER, + null ); recoverShardFromStore(newShard); @@ -2828,7 +2895,8 @@ public void testSearchIsReleaseIfWrapperFails() throws IOException { shard.getEngineConfigFactory(), () -> {}, RetentionLeaseSyncer.EMPTY, - EMPTY_EVENT_LISTENER + EMPTY_EVENT_LISTENER, + null ); recoverShardFromStore(newShard); @@ -3493,7 +3561,8 @@ private IndexShard newShard(SegmentReplicationCheckpointPublisher checkpointPubl () -> {}, RetentionLeaseSyncer.EMPTY, EMPTY_EVENT_LISTENER, - checkpointPublisher + checkpointPublisher, + null ); } @@ -3549,7 +3618,8 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO indexShard.engineConfigFactory, indexShard.getGlobalCheckpointSyncer(), indexShard.getRetentionLeaseSyncer(), - EMPTY_EVENT_LISTENER + EMPTY_EVENT_LISTENER, + null ); final IndexShardRecoveryException indexShardRecoveryException = expectThrows( @@ -3606,7 +3676,8 @@ public void testShardDoesNotStartIfCorruptedMarkerIsPresent() throws Exception { indexShard.engineConfigFactory, indexShard.getGlobalCheckpointSyncer(), indexShard.getRetentionLeaseSyncer(), - EMPTY_EVENT_LISTENER + EMPTY_EVENT_LISTENER, + null ); final IndexShardRecoveryException exception1 = expectThrows( @@ -3640,7 +3711,8 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO indexShard.engineConfigFactory, indexShard.getGlobalCheckpointSyncer(), indexShard.getRetentionLeaseSyncer(), - EMPTY_EVENT_LISTENER + EMPTY_EVENT_LISTENER, + null ); final IndexShardRecoveryException exception2 = expectThrows( @@ -3694,7 +3766,8 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception { indexShard.engineConfigFactory, indexShard.getGlobalCheckpointSyncer(), indexShard.getRetentionLeaseSyncer(), - EMPTY_EVENT_LISTENER + EMPTY_EVENT_LISTENER, + null ); Store.MetadataSnapshot storeFileMetadatas = newShard.snapshotStoreMetadata(); @@ -4441,7 +4514,8 @@ protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(SeqNoStats seqNoStats) { // just like a following shard, we need to skip this check for now. } }, - shard.getEngineConfigFactory() + shard.getEngineConfigFactory(), + null ); DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); readonlyShard.markAsRecovering("store", new RecoveryState(readonlyShard.routingEntry(), localNode, null)); diff --git a/server/src/test/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommandTests.java b/server/src/test/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommandTests.java index 9a2a0dd7e070c..7ee3addf9b784 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommandTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommandTests.java @@ -174,7 +174,8 @@ public void setup() throws IOException { new EngineConfigFactory(new IndexSettings(indexMetadata, settings)), () -> {}, RetentionLeaseSyncer.EMPTY, - EMPTY_EVENT_LISTENER + EMPTY_EVENT_LISTENER, + null ), true ); @@ -550,7 +551,8 @@ private IndexShard reopenIndexShard(boolean corrupted) throws IOException { indexShard.getEngineConfigFactory(), indexShard.getGlobalCheckpointSyncer(), indexShard.getRetentionLeaseSyncer(), - EMPTY_EVENT_LISTENER + EMPTY_EVENT_LISTENER, + null ); } diff --git a/server/src/test/java/org/opensearch/index/store/RemoteIndexInputTests.java b/server/src/test/java/org/opensearch/index/store/RemoteIndexInputTests.java index 5f96b57642d18..ec7cda1eb2d87 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteIndexInputTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteIndexInputTests.java @@ -18,6 +18,8 @@ import static org.mockito.Mockito.when; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.doThrow; @@ -59,6 +61,17 @@ public void testReadBytes() throws IOException { verify(inputStream).read(buffer, 10, 20); } + public void testReadBytesMultipleIterations() throws IOException { + byte[] buffer = new byte[20]; + when(inputStream.read(eq(buffer), anyInt(), anyInt())).thenReturn(10).thenReturn(3).thenReturn(6).thenReturn(-1); + remoteIndexInput.readBytes(buffer, 0, 20); + + verify(inputStream).read(buffer, 0, 20); + verify(inputStream).read(buffer, 10, 10); + verify(inputStream).read(buffer, 13, 7); + verify(inputStream).read(buffer, 19, 1); + } + public void testReadBytesIOException() throws IOException { byte[] buffer = new byte[10]; when(inputStream.read(buffer, 10, 20)).thenThrow(new IOException("Error reading")); diff --git a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java index d85b2f1e22979..2a88345346e52 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -317,7 +317,8 @@ public void testClosedIndexSkipsLocalRecovery() throws Exception { ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.PeerRecoverySource.INSTANCE), indexMetadata, NoOpEngine::new, - new EngineConfigFactory(shard.indexSettings()) + new EngineConfigFactory(shard.indexSettings()), + null ); replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode)); replica.prepareForIndexRecovery(); diff --git a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java index 7cbe3d6dbd30a..81a4a4eace608 100644 --- a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java +++ b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java @@ -140,7 +140,8 @@ public void testRestoreSnapshotWithExistingFiles() throws IOException { new EngineConfigFactory(shard.indexSettings()), () -> {}, RetentionLeaseSyncer.EMPTY, - EMPTY_EVENT_LISTENER + EMPTY_EVENT_LISTENER, + null ); // restore the shard diff --git a/test/framework/src/main/java/org/opensearch/cluster/routing/ShardRoutingHelper.java b/test/framework/src/main/java/org/opensearch/cluster/routing/ShardRoutingHelper.java index ccd686210e8d9..6df189081cfa4 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/routing/ShardRoutingHelper.java +++ b/test/framework/src/main/java/org/opensearch/cluster/routing/ShardRoutingHelper.java @@ -32,8 +32,6 @@ package org.opensearch.cluster.routing; -import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource; - /** * A helper class that allows access to package private APIs for testing. */ @@ -77,7 +75,7 @@ public static ShardRouting moveToUnassigned(ShardRouting routing, UnassignedInfo return routing.moveToUnassigned(info); } - public static ShardRouting newWithRestoreSource(ShardRouting routing, SnapshotRecoverySource recoverySource) { + public static ShardRouting newWithRestoreSource(ShardRouting routing, RecoverySource recoverySource) { return new ShardRouting( routing.shardId(), routing.currentNodeId(), diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index 57368bed9b7ac..b3f062aef4fbe 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -235,7 +235,7 @@ protected class ReplicationGroup implements AutoCloseable, Iterable protected ReplicationGroup(final IndexMetadata indexMetadata) throws IOException { final ShardRouting primaryRouting = this.createShardRouting("s0", true); - primary = newShard(primaryRouting, indexMetadata, null, getEngineFactory(primaryRouting), () -> {}, retentionLeaseSyncer); + primary = newShard(primaryRouting, indexMetadata, null, getEngineFactory(primaryRouting), () -> {}, retentionLeaseSyncer, null); replicas = new CopyOnWriteArrayList<>(); this.indexMetadata = indexMetadata; updateAllocationIDsOnPrimary(); @@ -362,7 +362,8 @@ public IndexShard addReplica() throws IOException { null, getEngineFactory(replicaRouting), () -> {}, - retentionLeaseSyncer + retentionLeaseSyncer, + null ); addReplica(replica); return replica; @@ -403,7 +404,8 @@ public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardP getEngineConfigFactory(new IndexSettings(indexMetadata, indexMetadata.getSettings())), () -> {}, retentionLeaseSyncer, - EMPTY_EVENT_LISTENER + EMPTY_EVENT_LISTENER, + null ); replicas.add(newReplica); if (replicationTargets != null) { diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index a8d140f6bb1b4..7dedc572ff19b 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -301,7 +301,7 @@ protected IndexShard newShard( .settings(indexSettings) .primaryTerm(0, primaryTerm) .putMapping("{ \"properties\": {} }"); - return newShard(shardRouting, metadata.build(), null, engineFactory, () -> {}, RetentionLeaseSyncer.EMPTY, listeners); + return newShard(shardRouting, metadata.build(), null, engineFactory, () -> {}, RetentionLeaseSyncer.EMPTY, null, listeners); } /** @@ -370,7 +370,8 @@ protected IndexShard newShard( readerWrapper, new InternalEngineFactory(), globalCheckpointSyncer, - RetentionLeaseSyncer.EMPTY + RetentionLeaseSyncer.EMPTY, + null ); } @@ -389,7 +390,7 @@ protected IndexShard newShard( EngineFactory engineFactory, IndexingOperationListener... listeners ) throws IOException { - return newShard(routing, indexMetadata, indexReaderWrapper, engineFactory, () -> {}, RetentionLeaseSyncer.EMPTY, listeners); + return newShard(routing, indexMetadata, indexReaderWrapper, engineFactory, () -> {}, RetentionLeaseSyncer.EMPTY, null, listeners); } /** @@ -408,6 +409,7 @@ protected IndexShard newShard( @Nullable EngineFactory engineFactory, Runnable globalCheckpointSyncer, RetentionLeaseSyncer retentionLeaseSyncer, + Store remoteStore, IndexingOperationListener... listeners ) throws IOException { // add node id as name to settings for proper logging @@ -425,6 +427,7 @@ protected IndexShard newShard( globalCheckpointSyncer, retentionLeaseSyncer, EMPTY_EVENT_LISTENER, + remoteStore, listeners ); } @@ -452,6 +455,7 @@ protected IndexShard newShard( Runnable globalCheckpointSyncer, RetentionLeaseSyncer retentionLeaseSyncer, IndexEventListener indexEventListener, + Store remoteStore, IndexingOperationListener... listeners ) throws IOException { return newShard( @@ -466,6 +470,7 @@ protected IndexShard newShard( retentionLeaseSyncer, indexEventListener, SegmentReplicationCheckpointPublisher.EMPTY, + remoteStore, listeners ); } @@ -494,6 +499,7 @@ protected IndexShard newShard( RetentionLeaseSyncer retentionLeaseSyncer, IndexEventListener indexEventListener, SegmentReplicationCheckpointPublisher checkpointPublisher, + @Nullable Store remoteStore, IndexingOperationListener... listeners ) throws IOException { final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build(); @@ -521,6 +527,13 @@ protected IndexShard newShard( Collections.emptyList(), clusterSettings ); + if (remoteStore == null && indexSettings.isRemoteStoreEnabled()) { + ShardId shardId = shardPath.getShardId(); + NodeEnvironment.NodePath remoteNodePath = new NodeEnvironment.NodePath(createTempDir()); + ShardPath remoteShardPath = new ShardPath(false, remoteNodePath.resolve(shardId), remoteNodePath.resolve(shardId), shardId); + storeProvider = is -> createStore(is, remoteShardPath); + remoteStore = storeProvider.apply(indexSettings); + } indexShard = new IndexShard( routing, indexSettings, @@ -543,7 +556,7 @@ protected IndexShard newShard( retentionLeaseSyncer, breakerService, checkpointPublisher, - null + remoteStore ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); success = true; @@ -585,6 +598,7 @@ protected IndexShard reinitShard(IndexShard current, ShardRouting routing, Index current.indexSettings.getIndexMetadata(), current.engineFactory, current.engineConfigFactory, + current.remoteStore(), listeners ); } @@ -603,6 +617,7 @@ protected IndexShard reinitShard( IndexMetadata indexMetadata, EngineFactory engineFactory, EngineConfigFactory engineConfigFactory, + Store remoteStore, IndexingOperationListener... listeners ) throws IOException { closeShards(current); @@ -617,6 +632,7 @@ protected IndexShard reinitShard( current.getGlobalCheckpointSyncer(), current.getRetentionLeaseSyncer(), EMPTY_EVENT_LISTENER, + remoteStore, listeners ); }