diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationRemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationRemoteStoreIT.java new file mode 100644 index 0000000000000..69367da9ac557 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationRemoteStoreIT.java @@ -0,0 +1,62 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.junit.After; +import org.junit.Before; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.indices.replication.SegmentReplicationIT; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.file.Path; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +/** + * The aim of this class is to run Segment Replication integ tests by enabling remote store specific settings. + * This makes sure that the constructs/flows that are being tested with Segment Replication, holds true after enabling + * remote store. + */ +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SegmentReplicationRemoteStoreIT extends SegmentReplicationIT { + + private static final String REPOSITORY_NAME = "test-remore-store-repo"; + + @Override + public Settings indexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME) + .build(); + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build(); + } + + @Before + public void setup() { + internalCluster().startClusterManagerOnlyNode(); + Path absolutePath = randomRepoPath().toAbsolutePath(); + assertAcked( + clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath)) + ); + } + + @After + public void teardown() { + assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); + } +} diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 3453ee196077c..f2aa886aed374 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -159,6 +159,8 @@ import org.opensearch.index.store.Store.MetadataSnapshot; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.store.StoreStats; +import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory; +import org.opensearch.index.translog.RemoteFsTranslog; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogConfig; import org.opensearch.index.translog.TranslogFactory; @@ -2234,6 +2236,9 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t if (indexSettings.isRemoteStoreEnabled()) { syncSegmentsFromRemoteSegmentStore(false); } + if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { + syncRemoteTranslogAndUpdateGlobalCheckpoint(); + } // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata). final Engine newEngine = engineFactory.newReadWriteEngine(config); onNewEngine(newEngine); @@ -2520,10 +2525,10 @@ public void recoverFromStore(ActionListener listener) { storeRecovery.recoverFromStore(this, listener); } - public void restoreFromRemoteStore(Repository repository, ActionListener listener) { + public void restoreFromRemoteStore(ActionListener listener) { assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard"; StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); - storeRecovery.recoverFromRemoteStore(this, repository, listener); + storeRecovery.recoverFromRemoteStore(this, listener); } public void restoreFromRepository(Repository repository, ActionListener listener) { @@ -3324,14 +3329,7 @@ public void startRecovery( executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore); break; case REMOTE_STORE: - final Repository remoteTranslogRepo; - final String remoteTranslogRepoName = indexSettings.getRemoteStoreTranslogRepository(); - if (remoteTranslogRepoName != null) { - remoteTranslogRepo = repositoriesService.repository(remoteTranslogRepoName); - } else { - remoteTranslogRepo = null; - } - executeRecovery("from remote store", recoveryState, recoveryListener, l -> restoreFromRemoteStore(remoteTranslogRepo, l)); + executeRecovery("from remote store", recoveryState, recoveryListener, l -> restoreFromRemoteStore(l)); break; case PEER: try { @@ -4406,6 +4404,9 @@ public void close() throws IOException { if (indexSettings.isRemoteStoreEnabled()) { syncSegmentsFromRemoteSegmentStore(false); } + if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { + syncRemoteTranslogAndUpdateGlobalCheckpoint(); + } newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker))); onNewEngine(newEngineReference.get()); } @@ -4439,6 +4440,18 @@ public void close() throws IOException { onSettingsChanged(); } + private void syncRemoteTranslogAndUpdateGlobalCheckpoint() throws IOException { + syncTranslogFilesFromRemoteTranslog(); + loadGlobalCheckpointToReplicationTracker(); + } + + public void syncTranslogFilesFromRemoteTranslog() throws IOException { + TranslogFactory translogFactory = translogFactorySupplier.apply(indexSettings, shardRouting); + assert translogFactory instanceof RemoteBlobStoreInternalTranslogFactory; + Repository repository = ((RemoteBlobStoreInternalTranslogFactory) translogFactory).getRepository(); + RemoteFsTranslog.download(repository, shardId, getThreadPool(), shardPath().resolveTranslog()); + } + /** * Downloads segments from remote segment store. This method will download segments till * last refresh checkpoint. diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index 31a863129cc8c..4618f9004f7b5 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -60,15 +60,11 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.snapshots.IndexShardRestoreFailedException; import org.opensearch.index.store.Store; -import org.opensearch.index.translog.RemoteFsTranslog; import org.opensearch.index.translog.Translog; -import org.opensearch.index.translog.transfer.FileTransferTracker; -import org.opensearch.index.translog.transfer.TranslogTransferManager; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.Repository; -import org.opensearch.repositories.blobstore.BlobStoreRepository; import java.io.IOException; import java.util.Arrays; @@ -118,13 +114,13 @@ void recoverFromStore(final IndexShard indexShard, ActionListener liste } } - void recoverFromRemoteStore(final IndexShard indexShard, Repository repository, ActionListener listener) { + void recoverFromRemoteStore(final IndexShard indexShard, ActionListener listener) { if (canRecover(indexShard)) { RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType(); assert recoveryType == RecoverySource.Type.REMOTE_STORE : "expected remote store recovery type but was: " + recoveryType; ActionListener.completeWith(recoveryListener(indexShard, listener), () -> { logger.debug("starting recovery from remote store ..."); - recoverFromRemoteStore(indexShard, repository); + recoverFromRemoteStore(indexShard); return true; }); } else { @@ -441,7 +437,7 @@ private ActionListener recoveryListener(IndexShard indexShard, ActionLi }); } - private void recoverFromRemoteStore(IndexShard indexShard, Repository repository) throws IndexShardRecoveryException { + private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardRecoveryException { final Store remoteStore = indexShard.remoteStore(); if (remoteStore == null) { throw new IndexShardRecoveryException( @@ -462,8 +458,8 @@ private void recoverFromRemoteStore(IndexShard indexShard, Repository repository if (store.directory().listAll().length == 0) { store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion); } - if (repository != null) { - syncTranslogFilesFromRemoteTranslog(indexShard, repository); + if (indexShard.indexSettings.isRemoteTranslogStoreEnabled()) { + indexShard.syncTranslogFilesFromRemoteTranslog(); } else { bootstrap(indexShard, store); } @@ -482,19 +478,6 @@ private void recoverFromRemoteStore(IndexShard indexShard, Repository repository } } - private void syncTranslogFilesFromRemoteTranslog(IndexShard indexShard, Repository repository) throws IOException { - assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; - BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository; - FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId); - TranslogTransferManager translogTransferManager = RemoteFsTranslog.buildTranslogTransferManager( - blobStoreRepository, - indexShard.getThreadPool(), - shardId, - fileTransferTracker - ); - RemoteFsTranslog.download(translogTransferManager, indexShard.shardPath().resolveTranslog()); - } - /** * Recovers the state of the shard from the store. */ diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java index e439a56581c14..339e16db6f360 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java @@ -71,4 +71,8 @@ public Translog newTranslog( primaryModeSupplier ); } + + public Repository getRepository() { + return repository; + } } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 2b689877e7a27..dcd13ad8916d5 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -22,6 +22,7 @@ import org.opensearch.index.translog.transfer.TranslogTransferManager; import org.opensearch.index.translog.transfer.TranslogTransferMetadata; import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; +import org.opensearch.repositories.Repository; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.threadpool.ThreadPool; @@ -116,8 +117,20 @@ public RemoteFsTranslog( } } - public static void download(TranslogTransferManager translogTransferManager, Path location) throws IOException { + public static void download(Repository repository, ShardId shardId, ThreadPool threadPool, Path location) throws IOException { + assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; + BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository; + FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId); + TranslogTransferManager translogTransferManager = buildTranslogTransferManager( + blobStoreRepository, + threadPool, + shardId, + fileTransferTracker + ); + RemoteFsTranslog.download(translogTransferManager, location); + } + public static void download(TranslogTransferManager translogTransferManager, Path location) throws IOException { TranslogTransferMetadata translogMetadata = translogTransferManager.readMetadata(); if (translogMetadata != null) { if (Files.notExists(location)) { diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index a2e1fd0f4570f..5253a017f8e0e 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -2873,7 +2873,7 @@ public void testRestoreShardFromRemoteStore(boolean performFlush) throws IOExcep DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); target.markAsRecovering("remote_store", new RecoveryState(routing, localNode, null)); final PlainActionFuture future = PlainActionFuture.newFuture(); - target.restoreFromRemoteStore(null, future); + target.restoreFromRemoteStore(future); target.remoteStore().decRef(); assertTrue(future.actionGet());