Skip to content

Commit

Permalink
[Snapshot Interop] Add changes for overriding remote store and replic… (
Browse files Browse the repository at this point in the history
opensearch-project#11868)

Signed-off-by: Harish Bhakuni <hbhakuni@amazon.com>
  • Loading branch information
harishbhakuni authored Apr 11, 2024
1 parent 74232c7 commit 4fc3a02
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 164 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Store] Make translog transfer timeout configurable ([#12704](https://github.com/opensearch-project/OpenSearch/pull/12704))
- Reject Resize index requests (i.e, split, shrink and clone), While DocRep to SegRep migration is in progress.([#12686](https://github.com/opensearch-project/OpenSearch/pull/12686))
- Add support for more than one protocol for transport ([#12967](https://github.com/opensearch-project/OpenSearch/pull/12967))
- Add changes for overriding remote store and replication settings during snapshot restore. ([#11868](https://github.com/opensearch-project/OpenSearch/pull/11868))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.client.Client;
Expand All @@ -36,6 +34,7 @@
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.snapshots.AbstractSnapshotIntegTestCase;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.snapshots.SnapshotRestoreException;
import org.opensearch.snapshots.SnapshotState;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand All @@ -55,7 +54,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED;
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG;
Expand Down Expand Up @@ -118,7 +116,7 @@ private void assertDocsPresentInIndex(Client client, String indexName, int numOf
}
}

public void testRestoreOperationsShallowCopyEnabled() throws IOException, ExecutionException, InterruptedException {
public void testRestoreOperationsShallowCopyEnabled() throws Exception {
String clusterManagerNode = internalCluster().startClusterManagerOnlyNode();
String primary = internalCluster().startDataOnlyNode();
String indexName1 = "testindex1";
Expand All @@ -129,8 +127,6 @@ public void testRestoreOperationsShallowCopyEnabled() throws IOException, Execut
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);
String restoredIndexName1 = indexName1 + "-restored";
String restoredIndexName1Seg = indexName1 + "-restored-seg";
String restoredIndexName1Doc = indexName1 + "-restored-doc";
String restoredIndexName2 = indexName2 + "-restored";

createRepository(snapshotRepoName, "fs", getRepositorySettings(absolutePath1, true));
Expand Down Expand Up @@ -212,60 +208,6 @@ public void testRestoreOperationsShallowCopyEnabled() throws IOException, Execut
indexDocuments(client, restoredIndexName1, numDocsInIndex1, numDocsInIndex1 + 2);
ensureGreen(restoredIndexName1);
assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1 + 2);

// restore index as seg rep enabled with remote store and remote translog disabled
RestoreSnapshotResponse restoreSnapshotResponse3 = client.admin()
.cluster()
.prepareRestoreSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(false)
.setIgnoreIndexSettings(IndexMetadata.SETTING_REMOTE_STORE_ENABLED)
.setIndices(indexName1)
.setRenamePattern(indexName1)
.setRenameReplacement(restoredIndexName1Seg)
.get();
assertEquals(restoreSnapshotResponse3.status(), RestStatus.ACCEPTED);
ensureGreen(restoredIndexName1Seg);

GetIndexResponse getIndexResponse = client.admin()
.indices()
.getIndex(new GetIndexRequest().indices(restoredIndexName1Seg).includeDefaults(true))
.get();
indexSettings = getIndexResponse.settings().get(restoredIndexName1Seg);
assertNull(indexSettings.get(SETTING_REMOTE_STORE_ENABLED));
assertNull(indexSettings.get(SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, null));
assertEquals(ReplicationType.SEGMENT.toString(), indexSettings.get(IndexMetadata.SETTING_REPLICATION_TYPE));
assertDocsPresentInIndex(client, restoredIndexName1Seg, numDocsInIndex1);
// indexing some new docs and validating
indexDocuments(client, restoredIndexName1Seg, numDocsInIndex1, numDocsInIndex1 + 2);
ensureGreen(restoredIndexName1Seg);
assertDocsPresentInIndex(client, restoredIndexName1Seg, numDocsInIndex1 + 2);

// restore index as doc rep based from shallow copy snapshot
RestoreSnapshotResponse restoreSnapshotResponse4 = client.admin()
.cluster()
.prepareRestoreSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(false)
.setIgnoreIndexSettings(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, IndexMetadata.SETTING_REPLICATION_TYPE)
.setIndices(indexName1)
.setRenamePattern(indexName1)
.setRenameReplacement(restoredIndexName1Doc)
.get();
assertEquals(restoreSnapshotResponse4.status(), RestStatus.ACCEPTED);
ensureGreen(restoredIndexName1Doc);

getIndexResponse = client.admin()
.indices()
.getIndex(new GetIndexRequest().indices(restoredIndexName1Doc).includeDefaults(true))
.get();
indexSettings = getIndexResponse.settings().get(restoredIndexName1Doc);
assertNull(indexSettings.get(SETTING_REMOTE_STORE_ENABLED));
assertNull(indexSettings.get(SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, null));
assertNull(indexSettings.get(IndexMetadata.SETTING_REPLICATION_TYPE));
assertDocsPresentInIndex(client, restoredIndexName1Doc, numDocsInIndex1);
// indexing some new docs and validating
indexDocuments(client, restoredIndexName1Doc, numDocsInIndex1, numDocsInIndex1 + 2);
ensureGreen(restoredIndexName1Doc);
assertDocsPresentInIndex(client, restoredIndexName1Doc, numDocsInIndex1 + 2);
}

/**
Expand Down Expand Up @@ -579,83 +521,6 @@ protected IndexShard getIndexShard(String node, String indexName) {
return shardId.map(indexService::getShard).orElse(null);
}

public void testRestoreShallowCopySnapshotWithDifferentRepo() throws IOException {
String clusterManagerNode = internalCluster().startClusterManagerOnlyNode();
String primary = internalCluster().startDataOnlyNode();
String indexName1 = "testindex1";
String indexName2 = "testindex2";
String snapshotRepoName = "test-restore-snapshot-repo";
String remoteStoreRepo2Name = "test-rs-repo-2" + TEST_REMOTE_STORE_REPO_SUFFIX;
String snapshotName1 = "test-restore-snapshot1";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
Path absolutePath3 = randomRepoPath().toAbsolutePath();
String restoredIndexName1 = indexName1 + "-restored";

createRepository(snapshotRepoName, "fs", getRepositorySettings(absolutePath1, false));
createRepository(remoteStoreRepo2Name, "fs", absolutePath3);

Client client = client();
Settings indexSettings = getIndexSettings(1, 0).build();
createIndex(indexName1, indexSettings);

Settings indexSettings2 = getIndexSettings(1, 0).build();
createIndex(indexName2, indexSettings2);

final int numDocsInIndex1 = 5;
final int numDocsInIndex2 = 6;
indexDocuments(client, indexName1, numDocsInIndex1);
indexDocuments(client, indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

internalCluster().startDataOnlyNode();

logger.info("--> snapshot");
SnapshotInfo snapshotInfo1 = createSnapshot(
snapshotRepoName,
snapshotName1,
new ArrayList<>(Arrays.asList(indexName1, indexName2))
);
assertThat(snapshotInfo1.successfulShards(), greaterThan(0));
assertThat(snapshotInfo1.successfulShards(), equalTo(snapshotInfo1.totalShards()));
assertThat(snapshotInfo1.state(), equalTo(SnapshotState.SUCCESS));

Settings remoteStoreIndexSettings = Settings.builder()
.put(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, remoteStoreRepo2Name)
.build();
// restore index as a remote store index with different remote store repo
RestoreSnapshotResponse restoreSnapshotResponse = client.admin()
.cluster()
.prepareRestoreSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(false)
.setIndexSettings(remoteStoreIndexSettings)
.setIndices(indexName1)
.setRenamePattern(indexName1)
.setRenameReplacement(restoredIndexName1)
.get();
assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED);
ensureGreen(restoredIndexName1);
assertDocsPresentInIndex(client(), restoredIndexName1, numDocsInIndex1);

// deleting data for restoredIndexName1 and restoring from remote store.
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary));
// Re-initialize client to make sure we are not using client from stopped node.
client = client(clusterManagerNode);
assertAcked(client.admin().indices().prepareClose(restoredIndexName1));
client.admin()
.cluster()
.restoreRemoteStore(
new RestoreRemoteStoreRequest().indices(restoredIndexName1).restoreAllShards(true),
PlainActionFuture.newFuture()
);
ensureYellowAndNoInitializingShards(restoredIndexName1);
ensureGreen(restoredIndexName1);
// indexing some new docs and validating
assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1);
indexDocuments(client, restoredIndexName1, numDocsInIndex1, numDocsInIndex1 + 2);
ensureGreen(restoredIndexName1);
assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1 + 2);
}

public void testRestoreShallowSnapshotRepository() throws ExecutionException, InterruptedException {
String indexName1 = "testindex1";
String snapshotRepoName = "test-restore-snapshot-repo";
Expand Down Expand Up @@ -787,4 +652,98 @@ public void testRestoreShallowSnapshotIndexAfterSnapshot() throws ExecutionExcep
assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1 + 2);
}

public void testInvalidRestoreRequestScenarios() throws Exception {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();
String index = "test-index";
String snapshotRepo = "test-restore-snapshot-repo";
String newRemoteStoreRepo = "test-new-rs-repo";
String snapshotName1 = "test-restore-snapshot1";
String snapshotName2 = "test-restore-snapshot2";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);
String restoredIndex = index + "-restored";

createRepository(snapshotRepo, "fs", getRepositorySettings(absolutePath1, true));

Client client = client();
Settings indexSettings = getIndexSettings(1, 0).build();
createIndex(index, indexSettings);

final int numDocsInIndex = 5;
indexDocuments(client, index, numDocsInIndex);
ensureGreen(index);

internalCluster().startDataOnlyNode();
logger.info("--> snapshot");

SnapshotInfo snapshotInfo = createSnapshot(snapshotRepo, snapshotName1, new ArrayList<>(List.of(index)));
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));

updateRepository(snapshotRepo, "fs", getRepositorySettings(absolutePath1, false));
SnapshotInfo snapshotInfo2 = createSnapshot(snapshotRepo, snapshotName2, new ArrayList<>(List.of(index)));
assertThat(snapshotInfo2.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo2.successfulShards(), greaterThan(0));
assertThat(snapshotInfo2.successfulShards(), equalTo(snapshotInfo2.totalShards()));

DeleteResponse deleteResponse = client().prepareDelete(index, "0").execute().actionGet();
assertEquals(deleteResponse.getResult(), DocWriteResponse.Result.DELETED);
indexDocuments(client, index, numDocsInIndex, numDocsInIndex + randomIntBetween(2, 5));
ensureGreen(index);

// try index restore with remote store disabled
SnapshotRestoreException exception = expectThrows(
SnapshotRestoreException.class,
() -> client().admin()
.cluster()
.prepareRestoreSnapshot(snapshotRepo, snapshotName1)
.setWaitForCompletion(false)
.setIgnoreIndexSettings(SETTING_REMOTE_STORE_ENABLED)
.setIndices(index)
.setRenamePattern(index)
.setRenameReplacement(restoredIndex)
.get()
);
assertTrue(exception.getMessage().contains("cannot remove setting [index.remote_store.enabled] on restore"));

// try index restore with remote store repository modified
Settings remoteStoreIndexSettings = Settings.builder()
.put(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, newRemoteStoreRepo)
.build();

exception = expectThrows(
SnapshotRestoreException.class,
() -> client().admin()
.cluster()
.prepareRestoreSnapshot(snapshotRepo, snapshotName1)
.setWaitForCompletion(false)
.setIndexSettings(remoteStoreIndexSettings)
.setIndices(index)
.setRenamePattern(index)
.setRenameReplacement(restoredIndex)
.get()
);
assertTrue(exception.getMessage().contains("cannot modify setting [index.remote_store.segment.repository]" + " on restore"));

// try index restore with remote store repository and translog store repository disabled
exception = expectThrows(
SnapshotRestoreException.class,
() -> client().admin()
.cluster()
.prepareRestoreSnapshot(snapshotRepo, snapshotName1)
.setWaitForCompletion(false)
.setIgnoreIndexSettings(
IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY,
IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY
)
.setIndices(index)
.setRenamePattern(index)
.setRenameReplacement(restoredIndex)
.get()
);
assertTrue(exception.getMessage().contains("cannot remove setting [index.remote_store.segment.repository]" + " on restore"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,16 @@ public void createSnapshot() {
}

public RestoreSnapshotResponse restoreSnapshotWithSettings(Settings indexSettings) {
return restoreSnapshotWithSettings(indexSettings, RESTORED_INDEX_NAME);
}

public RestoreSnapshotResponse restoreSnapshotWithSettings(Settings indexSettings, String restoredIndexName) {
RestoreSnapshotRequestBuilder builder = client().admin()
.cluster()
.prepareRestoreSnapshot(REPOSITORY_NAME, SNAPSHOT_NAME)
.setWaitForCompletion(false)
.setRenamePattern(INDEX_NAME)
.setRenameReplacement(RESTORED_INDEX_NAME);
.setRenameReplacement(restoredIndexName);
if (indexSettings != null) {
builder.setIndexSettings(indexSettings);
}
Expand Down Expand Up @@ -311,7 +315,8 @@ public void testSnapshotRestoreOnIndexWithSegRepClusterSetting() throws Exceptio
* 2. Snapshot index
* 3. Add new set of nodes with `cluster.indices.replication.strategy` set to SEGMENT and `cluster.index.restrict.replication.type`
* set to true.
* 4. Perform restore on new set of nodes to validate restored index has `DOCUMENT` replication.
* 4. Perform restore on new set of nodes to validate restored index has `SEGMENT` replication.
* 5. Validate that if replication type is passed as DOCUMENT as request parameter, restore operation fails
*/
public void testSnapshotRestoreOnRestrictReplicationSetting() throws Exception {
final int documentCount = scaledRandomIntBetween(1, 10);
Expand All @@ -337,9 +342,20 @@ public void testSnapshotRestoreOnRestrictReplicationSetting() throws Exception {

createSnapshot();

// Delete index
RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(restoreIndexSegRepSettings(), RESTORED_INDEX_NAME);
assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED);
ensureGreen(RESTORED_INDEX_NAME);
GetSettingsResponse settingsResponse = client().admin()
.indices()
.getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME).includeDefaults(true))
.get();
assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, SETTING_REPLICATION_TYPE), ReplicationType.SEGMENT.toString());

// Delete indices
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get());
assertFalse("index [" + INDEX_NAME + "] should have been deleted", indexExists(INDEX_NAME));
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(RESTORED_INDEX_NAME)).get());
assertFalse("index [" + RESTORED_INDEX_NAME + "] should have been deleted", indexExists(RESTORED_INDEX_NAME));

// Start new set of nodes with cluster level replication type setting and restrict replication type setting.
Settings settings = Settings.builder()
Expand All @@ -361,7 +377,25 @@ public void testSnapshotRestoreOnRestrictReplicationSetting() throws Exception {
// Perform snapshot restore
logger.info("--> Performing snapshot restore to target index");

IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> restoreSnapshotWithSettings(null));
restoreSnapshotResponse = restoreSnapshotWithSettings(null);

// Assertions
assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED);
ensureGreen(RESTORED_INDEX_NAME);
settingsResponse = client().admin()
.indices()
.getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME).includeDefaults(true))
.get();
assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, SETTING_REPLICATION_TYPE), ReplicationType.SEGMENT.toString());

// restore index with cluster default setting
restoreSnapshotWithSettings(restoreIndexSegRepSettings(), RESTORED_INDEX_NAME + "1");

// Perform Snapshot Restore with different index name
IllegalArgumentException exception = expectThrows(
IllegalArgumentException.class,
() -> restoreSnapshotWithSettings(restoreIndexDocRepSettings(), RESTORED_INDEX_NAME + "2")
);
assertEquals(REPLICATION_MISMATCH_VALIDATION_ERROR, exception.getMessage());
}
}
Loading

0 comments on commit 4fc3a02

Please sign in to comment.