diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java
index c394a1f631690..7e0c1630a76e4 100644
--- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java
+++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java
@@ -713,7 +713,8 @@ public static final IndexShard newIndexShard(
             null,
             () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL,
             nodeId,
-            null
+            null,
+            false
         );
     }
 
diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationClusterSettingIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationClusterSettingIT.java
index f2cb7c9c6bfc8..d2f1e6313db07 100644
--- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationClusterSettingIT.java
+++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationClusterSettingIT.java
@@ -89,8 +89,8 @@ public void testIndexReplicationSettingOverridesSegRepClusterSetting() throws Ex
         Index index = resolveIndex(INDEX_NAME);
         Index anotherIndex = resolveIndex(ANOTHER_INDEX);
         IndicesService indicesService = internalCluster().getInstance(IndicesService.class, primaryNode);
-        assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabled(), false);
-        assertEquals(indicesService.indexService(anotherIndex).getIndexSettings().isSegRepEnabled(), true);
+        assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabledOrRemoteNode(), false);
+        assertEquals(indicesService.indexService(anotherIndex).getIndexSettings().isSegRepEnabledOrRemoteNode(), true);
     }
 
     public void testIndexReplicationSettingOverridesDocRepClusterSetting() throws Exception {
@@ -119,8 +119,8 @@ public void testIndexReplicationSettingOverridesDocRepClusterSetting() throws Ex
         Index index = resolveIndex(INDEX_NAME);
         Index anotherIndex = resolveIndex(ANOTHER_INDEX);
         IndicesService indicesService = internalCluster().getInstance(IndicesService.class, primaryNode);
-        assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabled(), true);
-        assertEquals(indicesService.indexService(anotherIndex).getIndexSettings().isSegRepEnabled(), false);
+        assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabledOrRemoteNode(), true);
+        assertEquals(indicesService.indexService(anotherIndex).getIndexSettings().isSegRepEnabledOrRemoteNode(), false);
     }
 
     public void testReplicationTypesOverrideNotAllowed_IndexAPI() {
diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java
index 88d6f6897ee68..19da668c432cf 100644
--- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java
+++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java
@@ -8,13 +8,19 @@
 
 package org.opensearch.remotemigration;
 
+import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
+import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
+import org.opensearch.cluster.metadata.RepositoryMetadata;
 import org.opensearch.common.settings.Settings;
 import org.opensearch.common.util.FeatureFlags;
+import org.opensearch.repositories.fs.ReloadableFsRepository;
 import org.opensearch.test.OpenSearchIntegTestCase;
 
 import java.nio.file.Path;
+import java.util.concurrent.ExecutionException;
 
-import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;
+import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING;
+import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
 
 public class MigrationBaseTestCase extends OpenSearchIntegTestCase {
     protected static final String REPOSITORY_NAME = "test-remote-store-repo";
@@ -35,11 +41,10 @@ protected Settings nodeSettings(int nodeOrdinal) {
             return Settings.builder()
                 .put(super.nodeSettings(nodeOrdinal))
                 .put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath))
-                .put("discovery.initial_state_timeout", "500ms")
                 .build();
         } else {
             logger.info("Adding docrep node");
-            return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put("discovery.initial_state_timeout", "500ms").build();
+            return Settings.builder().put(super.nodeSettings(nodeOrdinal)).build();
         }
     }
 
@@ -47,4 +52,16 @@ protected Settings nodeSettings(int nodeOrdinal) {
     protected Settings featureFlagSettings() {
         return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build();
     }
+
+    protected void setFailRate(String repoName, int value) throws ExecutionException, InterruptedException {
+        GetRepositoriesRequest gr = new GetRepositoriesRequest(new String[] { repoName });
+        GetRepositoriesResponse res = client().admin().cluster().getRepositories(gr).get();
+        RepositoryMetadata rmd = res.repositories().get(0);
+        Settings.Builder settings = Settings.builder()
+            .put("location", rmd.settings().get("location"))
+            .put(REPOSITORIES_FAILRATE_SETTING.getKey(), value);
+        assertAcked(
+            client().admin().cluster().preparePutRepository(repoName).setType(ReloadableFsRepository.TYPE).setSettings(settings).get()
+        );
+    }
 }
diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java
new file mode 100644
index 0000000000000..b1c429a45a1a1
--- /dev/null
+++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java
@@ -0,0 +1,223 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.remotemigration;
+
+import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
+
+import org.opensearch.action.DocWriteResponse;
+import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
+import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
+import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
+import org.opensearch.action.delete.DeleteResponse;
+import org.opensearch.action.index.IndexResponse;
+import org.opensearch.client.Client;
+import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
+import org.opensearch.common.Priority;
+import org.opensearch.common.settings.Settings;
+import org.opensearch.common.unit.TimeValue;
+import org.opensearch.index.query.QueryBuilders;
+import org.opensearch.plugins.Plugin;
+import org.opensearch.test.OpenSearchIntegTestCase;
+import org.opensearch.test.hamcrest.OpenSearchAssertions;
+import org.opensearch.test.transport.MockTransportService;
+
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Arrays.asList;
+import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
+import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
+
+@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
+public class RemotePrimaryRelocationIT extends MigrationBaseTestCase {
+    protected int maximumNumberOfShards() {
+        return 1;
+    }
+
+    // ToDo : Fix me when we support migration of replicas
+    protected int maximumNumberOfReplicas() {
+        return 0;
+    }
+
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        return asList(MockTransportService.TestPlugin.class);
+    }
+
+    public void testMixedModeRelocation() throws Exception {
+        String docRepNode = internalCluster().startNode();
+        Client client = internalCluster().client(docRepNode);
+        ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
+        updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed"));
+        assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
+
+        // create shard with 0 replica and 1 shard
+        client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get();
+        ensureGreen("test");
+
+        AtomicInteger numAutoGenDocs = new AtomicInteger();
+        final AtomicBoolean finished = new AtomicBoolean(false);
+        Thread indexingThread = getIndexingThread(finished, numAutoGenDocs);
+
+        refresh("test");
+
+        // add remote node in mixed mode cluster
+        addRemote = true;
+        String remoteNode = internalCluster().startNode();
+        internalCluster().validateClusterFormed();
+
+        String remoteNode2 = internalCluster().startNode();
+        internalCluster().validateClusterFormed();
+
+        // assert repo gets registered
+        GetRepositoriesRequest gr = new GetRepositoriesRequest(new String[] { REPOSITORY_NAME });
+        GetRepositoriesResponse getRepositoriesResponse = client.admin().cluster().getRepositories(gr).actionGet();
+        assertEquals(1, getRepositoriesResponse.repositories().size());
+
+        // Index some more docs
+        int currentDoc = numAutoGenDocs.get();
+        int finalCurrentDoc1 = currentDoc;
+        waitUntil(() -> numAutoGenDocs.get() > finalCurrentDoc1 + 5);
+
+        logger.info("-->  relocating from {} to {} ", docRepNode, remoteNode);
+        client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, docRepNode, remoteNode)).execute().actionGet();
+        ClusterHealthResponse clusterHealthResponse = client().admin()
+            .cluster()
+            .prepareHealth()
+            .setTimeout(TimeValue.timeValueSeconds(60))
+            .setWaitForEvents(Priority.LANGUID)
+            .setWaitForNoRelocatingShards(true)
+            .execute()
+            .actionGet();
+
+        assertEquals(0, clusterHealthResponse.getRelocatingShards());
+        assertEquals(remoteNode, primaryNodeName("test"));
+        logger.info("-->  relocation from docrep to remote  complete");
+
+        // Index some more docs
+        currentDoc = numAutoGenDocs.get();
+        int finalCurrentDoc = currentDoc;
+        waitUntil(() -> numAutoGenDocs.get() > finalCurrentDoc + 5);
+
+        client().admin()
+            .cluster()
+            .prepareReroute()
+            .add(new MoveAllocationCommand("test", 0, remoteNode, remoteNode2))
+            .execute()
+            .actionGet();
+        clusterHealthResponse = client().admin()
+            .cluster()
+            .prepareHealth()
+            .setTimeout(TimeValue.timeValueSeconds(60))
+            .setWaitForEvents(Priority.LANGUID)
+            .setWaitForNoRelocatingShards(true)
+            .execute()
+            .actionGet();
+
+        assertEquals(0, clusterHealthResponse.getRelocatingShards());
+        assertEquals(remoteNode2, primaryNodeName("test"));
+
+        logger.info("-->  relocation from remote to remote  complete");
+
+        finished.set(true);
+        indexingThread.join();
+        refresh("test");
+        OpenSearchAssertions.assertHitCount(client().prepareSearch("test").setTrackTotalHits(true).get(), numAutoGenDocs.get());
+        OpenSearchAssertions.assertHitCount(
+            client().prepareSearch("test")
+                .setTrackTotalHits(true)// extra paranoia ;)
+                .setQuery(QueryBuilders.termQuery("auto", true))
+                .get(),
+            numAutoGenDocs.get()
+        );
+
+    }
+
+    public void testMixedModeRelocation_RemoteSeedingFail() throws Exception {
+        String docRepNode = internalCluster().startNode();
+        Client client = internalCluster().client(docRepNode);
+        ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
+        updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed"));
+        assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
+
+        // create shard with 0 replica and 1 shard
+        client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get();
+        ensureGreen("test");
+
+        AtomicInteger numAutoGenDocs = new AtomicInteger();
+        final AtomicBoolean finished = new AtomicBoolean(false);
+        Thread indexingThread = getIndexingThread(finished, numAutoGenDocs);
+
+        refresh("test");
+
+        // add remote node in mixed mode cluster
+        addRemote = true;
+        String remoteNode = internalCluster().startNode();
+        internalCluster().validateClusterFormed();
+
+        // assert repo gets registered
+        GetRepositoriesRequest gr = new GetRepositoriesRequest(new String[] { REPOSITORY_NAME });
+        GetRepositoriesResponse getRepositoriesResponse = client.admin().cluster().getRepositories(gr).actionGet();
+        assertEquals(1, getRepositoriesResponse.repositories().size());
+
+        setFailRate(REPOSITORY_NAME, 100);
+
+        logger.info("--> relocating from {} to {} ", docRepNode, remoteNode);
+        client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, docRepNode, remoteNode)).execute().actionGet();
+        ClusterHealthResponse clusterHealthResponse = client().admin()
+            .cluster()
+            .prepareHealth()
+            .setTimeout(TimeValue.timeValueSeconds(5))
+            .setWaitForEvents(Priority.LANGUID)
+            .setWaitForNoRelocatingShards(true)
+            .execute()
+            .actionGet();
+
+        assertTrue(clusterHealthResponse.getRelocatingShards() == 1);
+        setFailRate(REPOSITORY_NAME, 0);
+        Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000));
+        clusterHealthResponse = client().admin()
+            .cluster()
+            .prepareHealth()
+            .setTimeout(TimeValue.timeValueSeconds(45))
+            .setWaitForEvents(Priority.LANGUID)
+            .setWaitForNoRelocatingShards(true)
+            .execute()
+            .actionGet();
+        assertTrue(clusterHealthResponse.getRelocatingShards() == 0);
+        logger.info("--> remote to remote relocation complete");
+        finished.set(true);
+        indexingThread.join();
+        refresh("test");
+        OpenSearchAssertions.assertHitCount(client().prepareSearch("test").setTrackTotalHits(true).get(), numAutoGenDocs.get());
+        OpenSearchAssertions.assertHitCount(
+            client().prepareSearch("test")
+                .setTrackTotalHits(true)// extra paranoia ;)
+                .setQuery(QueryBuilders.termQuery("auto", true))
+                .get(),
+            numAutoGenDocs.get()
+        );
+    }
+
+    private static Thread getIndexingThread(AtomicBoolean finished, AtomicInteger numAutoGenDocs) {
+        Thread indexingThread = new Thread(() -> {
+            while (finished.get() == false && numAutoGenDocs.get() < 10_000) {
+                IndexResponse indexResponse = client().prepareIndex("test").setId("id").setSource("field", "value").get();
+                assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
+                DeleteResponse deleteResponse = client().prepareDelete("test", "id").get();
+                assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult());
+                client().prepareIndex("test").setSource("auto", true).get();
+                numAutoGenDocs.incrementAndGet();
+            }
+        });
+        indexingThread.start();
+        return indexingThread;
+    }
+}
diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexPrimaryRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexPrimaryRelocationIT.java
index 869032a84c2c2..67316ed0e6e6b 100644
--- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexPrimaryRelocationIT.java
+++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexPrimaryRelocationIT.java
@@ -44,7 +44,6 @@ public Settings indexSettings() {
             .build();
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9191")
     public void testPrimaryRelocationWhileIndexing() throws Exception {
         internalCluster().startClusterManagerOnlyNode();
         super.testPrimaryRelocationWhileIndexing();
diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java
index 3899c8a80f442..ba90cbe96e157 100644
--- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java
+++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java
@@ -28,6 +28,7 @@
 import org.opensearch.cluster.service.ClusterService;
 import org.opensearch.common.UUIDs;
 import org.opensearch.common.settings.Settings;
+import org.opensearch.core.common.unit.ByteSizeUnit;
 import org.opensearch.core.index.Index;
 import org.opensearch.index.IndexModule;
 import org.opensearch.index.IndexService;
@@ -56,8 +57,11 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
 import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
 import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
+import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
+import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;
 import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING;
 import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
 
@@ -187,7 +191,122 @@ protected BulkResponse indexBulk(String indexName, int numDocs) {
         return client().bulk(bulkRequest).actionGet();
     }
 
-    private Settings defaultIndexSettings() {
+    public static Settings remoteStoreClusterSettings(String name, Path path) {
+        return remoteStoreClusterSettings(name, path, name, path);
+    }
+
+    public static Settings remoteStoreClusterSettings(
+        String segmentRepoName,
+        Path segmentRepoPath,
+        String segmentRepoType,
+        String translogRepoName,
+        Path translogRepoPath,
+        String translogRepoType
+    ) {
+        Settings.Builder settingsBuilder = Settings.builder();
+        settingsBuilder.put(
+            buildRemoteStoreNodeAttributes(
+                segmentRepoName,
+                segmentRepoPath,
+                segmentRepoType,
+                translogRepoName,
+                translogRepoPath,
+                translogRepoType,
+                false
+            )
+        );
+        return settingsBuilder.build();
+    }
+
+    public static Settings remoteStoreClusterSettings(
+        String segmentRepoName,
+        Path segmentRepoPath,
+        String translogRepoName,
+        Path translogRepoPath
+    ) {
+        Settings.Builder settingsBuilder = Settings.builder();
+        settingsBuilder.put(buildRemoteStoreNodeAttributes(segmentRepoName, segmentRepoPath, translogRepoName, translogRepoPath, false));
+        return settingsBuilder.build();
+    }
+
+    public static Settings buildRemoteStoreNodeAttributes(
+        String segmentRepoName,
+        Path segmentRepoPath,
+        String translogRepoName,
+        Path translogRepoPath,
+        boolean withRateLimiterAttributes
+    ) {
+        return buildRemoteStoreNodeAttributes(
+            segmentRepoName,
+            segmentRepoPath,
+            ReloadableFsRepository.TYPE,
+            translogRepoName,
+            translogRepoPath,
+            ReloadableFsRepository.TYPE,
+            withRateLimiterAttributes
+        );
+    }
+
+    public static Settings buildRemoteStoreNodeAttributes(
+        String segmentRepoName,
+        Path segmentRepoPath,
+        String segmentRepoType,
+        String translogRepoName,
+        Path translogRepoPath,
+        String translogRepoType,
+        boolean withRateLimiterAttributes
+    ) {
+        String segmentRepoTypeAttributeKey = String.format(
+            Locale.getDefault(),
+            "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
+            segmentRepoName
+        );
+        String segmentRepoSettingsAttributeKeyPrefix = String.format(
+            Locale.getDefault(),
+            "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
+            segmentRepoName
+        );
+        String translogRepoTypeAttributeKey = String.format(
+            Locale.getDefault(),
+            "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
+            translogRepoName
+        );
+        String translogRepoSettingsAttributeKeyPrefix = String.format(
+            Locale.getDefault(),
+            "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
+            translogRepoName
+        );
+        String stateRepoTypeAttributeKey = String.format(
+            Locale.getDefault(),
+            "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
+            segmentRepoName
+        );
+        String stateRepoSettingsAttributeKeyPrefix = String.format(
+            Locale.getDefault(),
+            "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
+            segmentRepoName
+        );
+
+        Settings.Builder settings = Settings.builder()
+            .put("node.attr." + REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, segmentRepoName)
+            .put(segmentRepoTypeAttributeKey, segmentRepoType)
+            .put(segmentRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath)
+            .put("node.attr." + REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, translogRepoName)
+            .put(translogRepoTypeAttributeKey, translogRepoType)
+            .put(translogRepoSettingsAttributeKeyPrefix + "location", translogRepoPath)
+            .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, segmentRepoName)
+            .put(stateRepoTypeAttributeKey, segmentRepoType)
+            .put(stateRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath);
+
+        if (withRateLimiterAttributes) {
+            settings.put(segmentRepoSettingsAttributeKeyPrefix + "compress", randomBoolean())
+                .put(segmentRepoSettingsAttributeKeyPrefix + "chunk_size", 200, ByteSizeUnit.BYTES);
+        }
+
+        return settings.build();
+    }
+
+    Settings defaultIndexSettings() {
         return Settings.builder()
             .put(super.indexSettings())
             .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java
index c649c4ab13e7e..b019bb57743c9 100644
--- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java
+++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java
@@ -303,7 +303,7 @@ public void testSnapshotRestoreOnIndexWithSegRepClusterSetting() throws Exceptio
         // Verify index setting isSegRepEnabled.
         Index index = resolveIndex(RESTORED_INDEX_NAME);
         IndicesService indicesService = internalCluster().getInstance(IndicesService.class);
-        assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabled(), false);
+        assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabledOrRemoteNode(), false);
     }
 
     /**
diff --git a/server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java
index 1b912518d7e04..fc97d67c6c3af 100644
--- a/server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java
+++ b/server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java
@@ -148,7 +148,7 @@ protected SegmentReplicationShardStatsResponse shardOperation(SegmentReplication
         IndexShard indexShard = indexService.getShard(shardRouting.shardId().id());
         ShardId shardId = shardRouting.shardId();
 
-        if (indexShard.indexSettings().isSegRepEnabled() == false) {
+        if (indexShard.indexSettings().isSegRepEnabledOrRemoteNode() == false) {
             return null;
         }
 
diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java
index bc365b9872037..5d896e392e6bc 100644
--- a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java
+++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java
@@ -215,7 +215,7 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
                     ensureNodeCommissioned(node, currentState.metadata());
                     nodesBuilder.add(node);
 
-                    if (remoteDN.isEmpty()) {
+                    if (remoteDN.isEmpty() && node.isRemoteStoreNode()) {
                         // This is hit only on cases where we encounter first remote node
                         logger.info("Updating system repository now for remote store");
                         repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java
index 0909e2d5c8ff0..11dc4474cfa42 100644
--- a/server/src/main/java/org/opensearch/index/IndexService.java
+++ b/server/src/main/java/org/opensearch/index/IndexService.java
@@ -90,6 +90,7 @@
 import org.opensearch.index.shard.ShardNotInPrimaryModeException;
 import org.opensearch.index.shard.ShardPath;
 import org.opensearch.index.similarity.SimilarityService;
+import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
 import org.opensearch.index.store.Store;
 import org.opensearch.index.translog.Translog;
 import org.opensearch.index.translog.TranslogFactory;
@@ -99,7 +100,9 @@
 import org.opensearch.indices.recovery.RecoverySettings;
 import org.opensearch.indices.recovery.RecoveryState;
 import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
+import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
 import org.opensearch.plugins.IndexStorePlugin;
+import org.opensearch.repositories.RepositoriesService;
 import org.opensearch.script.ScriptService;
 import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
 import org.opensearch.threadpool.ThreadPool;
@@ -455,7 +458,10 @@ public synchronized IndexShard createShard(
         final Consumer<ShardId> globalCheckpointSyncer,
         final RetentionLeaseSyncer retentionLeaseSyncer,
         final SegmentReplicationCheckpointPublisher checkpointPublisher,
-        final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory
+        final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
+        final RepositoriesService repositoriesService,
+        final DiscoveryNode targetNode,
+        @Nullable DiscoveryNode sourceNode
     ) throws IOException {
         Objects.requireNonNull(retentionLeaseSyncer);
         /*
@@ -484,10 +490,26 @@ public synchronized IndexShard createShard(
                     warmer.warm(reader, shard, IndexService.this.indexSettings);
                 }
             };
-
             Store remoteStore = null;
-            if (this.indexSettings.isRemoteStoreEnabled()) {
-                Directory remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path);
+            boolean seedRemote = false;
+            if (targetNode.isRemoteStoreNode()) {
+                final Directory remoteDirectory;
+                if (this.indexSettings.isRemoteStoreEnabled()) {
+                    remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path);
+                } else {
+                    if (sourceNode != null && sourceNode.isRemoteStoreNode() == false) {
+                        if (routing.primary() == false) {
+                            throw new IllegalStateException("Can't migrate a remote shard to replica before primary " + routing.shardId());
+                        }
+                        logger.info("DocRep shard {} is migrating to remote", shardId);
+                        seedRemote = true;
+                    }
+                    remoteDirectory = ((RemoteSegmentStoreDirectoryFactory) remoteDirectoryFactory).newDirectory(
+                        RemoteStoreNodeAttribute.getRemoteStoreSegmentRepo(this.indexSettings.getNodeSettings()),
+                        this.indexSettings.getUUID(),
+                        shardId
+                    );
+                }
                 remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY, path);
             }
 
@@ -523,12 +545,13 @@ public synchronized IndexShard createShard(
                 retentionLeaseSyncer,
                 circuitBreakerService,
                 translogFactorySupplier,
-                this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
+                this.indexSettings.isSegRepEnabledOrRemoteNode() ? checkpointPublisher : null,
                 remoteStore,
                 remoteStoreStatsTrackerFactory,
                 clusterRemoteTranslogBufferIntervalSupplier,
                 nodeEnv.nodeId(),
-                recoverySettings
+                recoverySettings,
+                seedRemote
             );
             eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
             eventListener.afterIndexShardCreated(indexShard);
diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java
index 5aaea2c498701..7e49726c259cb 100644
--- a/server/src/main/java/org/opensearch/index/IndexSettings.java
+++ b/server/src/main/java/org/opensearch/index/IndexSettings.java
@@ -52,6 +52,7 @@
 import org.opensearch.indices.replication.common.ReplicationType;
 import org.opensearch.ingest.IngestService;
 import org.opensearch.node.Node;
+import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
 import org.opensearch.search.pipeline.SearchPipelineService;
 
 import java.util.Arrays;
@@ -1221,17 +1222,20 @@ public int getNumberOfReplicas() {
 
     /**
      * Returns true if segment replication is enabled on the index.
+     *
+     * Every shard on a remote node would also have SegRep enabled even without
+     * proper index setting during the migration.
      */
-    public boolean isSegRepEnabled() {
-        return ReplicationType.SEGMENT.equals(replicationType);
+    public boolean isSegRepEnabledOrRemoteNode() {
+        return ReplicationType.SEGMENT.equals(replicationType) || isRemoteNode();
     }
 
     public boolean isSegRepLocalEnabled() {
-        return isSegRepEnabled() && !isRemoteStoreEnabled();
+        return isSegRepEnabledOrRemoteNode() && !isRemoteStoreEnabled();
     }
 
     public boolean isSegRepWithRemoteEnabled() {
-        return isSegRepEnabled() && isRemoteStoreEnabled();
+        return isSegRepEnabledOrRemoteNode() && isRemoteStoreEnabled();
     }
 
     /**
@@ -1241,6 +1245,10 @@ public boolean isRemoteStoreEnabled() {
         return isRemoteStoreEnabled;
     }
 
+    public boolean isRemoteNode() {
+        return RemoteStoreNodeAttribute.isRemoteDataAttributePresent(this.getNodeSettings());
+    }
+
     /**
      * Returns if remote translog store is enabled for this index.
      */
diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java
index ce38dd3bb236c..297fe093f7f4e 100644
--- a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java
+++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java
@@ -145,7 +145,9 @@ public void isSegrepLimitBreached(ShardId shardId) {
         final IndexService indexService = indicesService.indexService(shardId.getIndex());
         if (indexService != null) {
             final IndexShard shard = indexService.getShard(shardId.id());
-            if (isSegmentReplicationBackpressureEnabled && shard.indexSettings().isSegRepEnabled() && shard.routingEntry().primary()) {
+            if (isSegmentReplicationBackpressureEnabled
+                && shard.indexSettings().isSegRepEnabledOrRemoteNode()
+                && shard.routingEntry().primary()) {
                 validateReplicationGroup(shard);
             }
         }
@@ -264,7 +266,8 @@ protected void runInternal() {
                             stats.getShardStats().get(shardId).getReplicaStats()
                         );
                         final IndexService indexService = pressureService.indicesService.indexService(shardId.getIndex());
-                        if (indexService.getIndexSettings() != null && indexService.getIndexSettings().isSegRepEnabled() == false) {
+                        if (indexService.getIndexSettings() != null
+                            && indexService.getIndexSettings().isSegRepEnabledOrRemoteNode() == false) {
                             return;
                         }
                         final IndexShard primaryShard = indexService.getShard(shardId.getId());
diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java b/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java
index f5fc8aa1c1eea..e48a76c438057 100644
--- a/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java
+++ b/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java
@@ -45,7 +45,7 @@ public SegmentReplicationStats getStats() {
         Map<ShardId, SegmentReplicationPerGroupStats> stats = new HashMap<>();
         for (IndexService indexService : indicesService) {
             for (IndexShard indexShard : indexService) {
-                if (indexShard.indexSettings().isSegRepEnabled() && indexShard.routingEntry().primary()) {
+                if (indexShard.indexSettings().isSegRepEnabledOrRemoteNode() && indexShard.routingEntry().primary()) {
                     stats.putIfAbsent(indexShard.shardId(), getStatsForShard(indexShard));
                 }
             }
diff --git a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java
index bf3e10d684c94..8106b65bddeec 100644
--- a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java
+++ b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java
@@ -244,7 +244,7 @@ private static void doValidateCodecSettings(final String codec) {
      * Creates a new {@link org.opensearch.index.engine.EngineConfig}
      */
     private EngineConfig(Builder builder) {
-        if (builder.isReadOnlyReplica && builder.indexSettings.isSegRepEnabled() == false) {
+        if (builder.isReadOnlyReplica && builder.indexSettings.isSegRepEnabledOrRemoteNode() == false) {
             throw new IllegalArgumentException("Shard can only be wired as a read only replica with Segment Replication enabled");
         }
         this.shardId = builder.shardId;
@@ -491,7 +491,7 @@ public LongSupplier getPrimaryTermSupplier() {
      * @return true if this engine should be wired as read only.
      */
     public boolean isReadOnlyReplica() {
-        return indexSettings.isSegRepEnabled() && isReadOnlyReplica;
+        return indexSettings.isSegRepEnabledOrRemoteNode() && isReadOnlyReplica;
     }
 
     /**
diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java
index a25ec95f58e05..7bacec22fc850 100644
--- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java
+++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java
@@ -710,7 +710,7 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op)
         final OpVsLuceneDocStatus status;
         VersionValue versionValue = getVersionFromMap(op.uid().bytes());
         assert incrementVersionLookup();
-        boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabled();
+        boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabledOrRemoteNode();
         if (versionValue != null) {
             status = compareOpToVersionMapOnSeqNo(op.id(), op.seqNo(), op.primaryTerm(), versionValue);
         } else {
@@ -1005,7 +1005,7 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO
             assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : index.seqNo() + ">=" + maxSeqNoOfUpdatesOrDeletes;
             plan = IndexingStrategy.optimizedAppendOnly(index.version(), 0);
         } else {
-            boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabled();
+            boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabledOrRemoteNode();
             versionMap.enforceSafeAccess();
             final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index);
             if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
@@ -1452,7 +1452,7 @@ protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws
             // See testRecoveryWithOutOfOrderDelete for an example of peer recovery
             plan = DeletionStrategy.processButSkipLucene(false, delete.version());
         } else {
-            boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabled();
+            boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabledOrRemoteNode();
             final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete);
             if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
                 if (segRepEnabled) {
@@ -1868,7 +1868,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
                         // only after the active reader is updated. This ensures that a flush does not wipe out a required commit point file
                         // while we are
                         // in refresh listeners.
-                        final GatedCloseable<IndexCommit> latestCommit = engineConfig.getIndexSettings().isSegRepEnabled()
+                        final GatedCloseable<IndexCommit> latestCommit = engineConfig.getIndexSettings().isSegRepEnabledOrRemoteNode()
                             ? acquireLastIndexCommit(false)
                             : null;
                         commitIndexWriter(indexWriter, translogManager.getTranslogUUID());
diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java
index ed8dba2f8902d..1e1825e1f8ace 100644
--- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java
+++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java
@@ -436,7 +436,8 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
                  This is not required for remote store implementations given on failover the replica re-syncs with the store
                  during promotion.
                  */
-                if (engineConfig.getIndexSettings().isRemoteStoreEnabled() == false) {
+                if (engineConfig.getIndexSettings().isRemoteStoreEnabled() == false
+                    && engineConfig.getIndexSettings().isRemoteNode() == false) {
                     latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT;
                     latestSegmentInfos.changed();
                 }
diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStoreStatsTrackerFactory.java b/server/src/main/java/org/opensearch/index/remote/RemoteStoreStatsTrackerFactory.java
index 9a146be96c9de..e4c7eb56d02c6 100644
--- a/server/src/main/java/org/opensearch/index/remote/RemoteStoreStatsTrackerFactory.java
+++ b/server/src/main/java/org/opensearch/index/remote/RemoteStoreStatsTrackerFactory.java
@@ -68,7 +68,7 @@ public RemoteStoreStatsTrackerFactory(ClusterService clusterService, Settings se
 
     @Override
     public void afterIndexShardCreated(IndexShard indexShard) {
-        if (indexShard.indexSettings().isRemoteStoreEnabled() == false) {
+        if (indexShard.indexSettings().isRemoteStoreEnabled() == false && indexShard.indexSettings().isRemoteNode() == false) {
             return;
         }
         ShardId shardId = indexShard.shardId();
diff --git a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java
index 7b9c1d3aa548f..0e625e9f30320 100644
--- a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java
+++ b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java
@@ -1059,7 +1059,7 @@ public ReplicationTracker(
         this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings());
         this.safeCommitInfoSupplier = safeCommitInfoSupplier;
         this.onReplicationGroupUpdated = onReplicationGroupUpdated;
-        this.latestReplicationCheckpoint = indexSettings.isSegRepEnabled() ? ReplicationCheckpoint.empty(shardId) : null;
+        this.latestReplicationCheckpoint = indexSettings.isSegRepEnabledOrRemoteNode() ? ReplicationCheckpoint.empty(shardId) : null;
         assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false;
         assert invariant();
     }
@@ -1173,7 +1173,7 @@ public synchronized void updateGlobalCheckpointForShard(final String allocationI
      * @param visibleCheckpoint the visible checkpoint
      */
     public synchronized void updateVisibleCheckpointForShard(final String allocationId, final ReplicationCheckpoint visibleCheckpoint) {
-        assert indexSettings.isSegRepEnabled();
+        assert indexSettings.isSegRepEnabledOrRemoteNode();
         assert primaryMode;
         assert handoffInProgress == false;
         assert invariant();
@@ -1217,7 +1217,7 @@ public synchronized void updateVisibleCheckpointForShard(final String allocation
      * @param checkpoint {@link ReplicationCheckpoint}
      */
     public synchronized void setLatestReplicationCheckpoint(ReplicationCheckpoint checkpoint) {
-        assert indexSettings.isSegRepEnabled();
+        assert indexSettings.isSegRepEnabledOrRemoteNode();
         if (checkpoint.equals(latestReplicationCheckpoint) == false) {
             this.latestReplicationCheckpoint = checkpoint;
         }
@@ -1269,7 +1269,7 @@ && isPrimaryRelocation(allocationId) == false
      * @param checkpoint {@link ReplicationCheckpoint}
      */
     public synchronized void startReplicationLagTimers(ReplicationCheckpoint checkpoint) {
-        assert indexSettings.isSegRepEnabled();
+        assert indexSettings.isSegRepEnabledOrRemoteNode();
         if (checkpoint.equals(latestReplicationCheckpoint) == false) {
             this.latestReplicationCheckpoint = checkpoint;
         }
@@ -1294,7 +1294,7 @@ && isPrimaryRelocation(e.getKey()) == false
      * V2 - Set of {@link SegmentReplicationShardStats} per shard in this primary's replication group.
      */
     public synchronized Set<SegmentReplicationShardStats> getSegmentReplicationStats() {
-        assert indexSettings.isSegRepEnabled();
+        assert indexSettings.isSegRepEnabledOrRemoteNode();
         if (primaryMode) {
             return this.checkpoints.entrySet()
                 .stream()
diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java
index 977155a1cbb72..72ce858661031 100644
--- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java
+++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java
@@ -234,6 +234,9 @@
 import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY;
 import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO;
 import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
+import static org.opensearch.index.shard.IndexShard.ShardMigrationState.REMOTE_MIGRATING_SEEDED;
+import static org.opensearch.index.shard.IndexShard.ShardMigrationState.REMOTE_MIGRATING_UNSEEDED;
+import static org.opensearch.index.shard.IndexShard.ShardMigrationState.REMOTE_NON_MIGRATING;
 import static org.opensearch.index.translog.Translog.Durability;
 import static org.opensearch.index.translog.Translog.TRANSLOG_UUID_KEY;
 
@@ -346,6 +349,12 @@ Runnable getGlobalCheckpointSyncer() {
     private final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
     private final RemoteStoreFileDownloader fileDownloader;
     private final RecoverySettings recoverySettings;
+    /*
+     On source doc rep node,  It will be DOCREP_NON_MIGRATING.
+     On source remote node , it will be REMOTE_MIGRATING_SEEDED when relocating from remote node
+     On source remote node , it will be REMOTE_MIGRATING_UNSEEDED when relocating from docrep node
+     */
+    private final ShardMigrationState shardMigrationState;
 
     public IndexShard(
         final ShardRouting shardRouting,
@@ -374,7 +383,8 @@ public IndexShard(
         final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
         final Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier,
         final String nodeId,
-        final RecoverySettings recoverySettings
+        final RecoverySettings recoverySettings,
+        boolean seedRemote
     ) throws IOException {
         super(shardRouting.shardId(), indexSettings);
         assert shardRouting.initializing();
@@ -394,7 +404,7 @@ public IndexShard(
             logger,
             threadPool,
             this::getEngine,
-            indexSettings.isRemoteTranslogStoreEnabled(),
+            indexSettings.isRemoteNode(),
             () -> getRemoteTranslogUploadBufferInterval(clusterRemoteTranslogBufferIntervalSupplier)
         );
         this.mapperService = mapperService;
@@ -472,6 +482,7 @@ public boolean shouldCache(Query query) {
         this.remoteStoreStatsTrackerFactory = remoteStoreStatsTrackerFactory;
         this.recoverySettings = recoverySettings;
         this.fileDownloader = new RemoteStoreFileDownloader(shardRouting.shardId(), threadPool, recoverySettings);
+        this.shardMigrationState = getShardMigrationState(indexSettings, seedRemote);
     }
 
     public ThreadPool getThreadPool() {
@@ -482,6 +493,20 @@ public Store store() {
         return this.store;
     }
 
+    public boolean isMigratingToRemote() {
+        // set it true only if shard is remote, but index setting doesn't say so
+        return shardMigrationState == REMOTE_MIGRATING_UNSEEDED || shardMigrationState == REMOTE_MIGRATING_SEEDED;
+    }
+
+    public boolean shouldSeedRemoteStore() {
+        // set it true only if relocating from docrep to remote store
+        return shardMigrationState == REMOTE_MIGRATING_UNSEEDED;
+    }
+
+    public boolean isRemoteSeeded() {
+        return shardMigrationState == REMOTE_MIGRATING_SEEDED;
+    }
+
     public Store remoteStore() {
         return this.remoteStore;
     }
@@ -625,7 +650,7 @@ public void updateShardState(
 
                 // Flush here after relocation of primary, so that replica get all changes from new primary rather than waiting for more
                 // docs to get indexed.
-                if (indexSettings.isSegRepEnabled()) {
+                if (indexSettings.isSegRepEnabledOrRemoteNode()) {
                     flush(new FlushRequest().waitIfOngoing(true).force(true));
                 }
             } else if (currentRouting.primary()
@@ -705,7 +730,7 @@ public void updateShardState(
                             + newRouting;
                         assert getOperationPrimaryTerm() == newPrimaryTerm;
                         try {
-                            if (indexSettings.isSegRepEnabled()) {
+                            if (indexSettings.isSegRepEnabledOrRemoteNode()) {
                                 // this Shard's engine was read only, we need to update its engine before restoring local history from xlog.
                                 assert newRouting.primary() && currentRouting.primary() == false;
                                 ReplicationTimer timer = new ReplicationTimer();
@@ -725,7 +750,7 @@ public void updateShardState(
                             }
 
                             replicationTracker.activatePrimaryMode(getLocalCheckpoint());
-                            if (indexSettings.isSegRepEnabled()) {
+                            if (indexSettings.isSegRepEnabledOrRemoteNode()) {
                                 // force publish a checkpoint once in primary mode so that replicas not caught up to previous primary
                                 // are brought up to date.
                                 checkpointPublisher.publish(this, getLatestReplicationCheckpoint());
@@ -839,8 +864,8 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta
      * relocated. After all operations are successfully blocked, performSegRep is executed followed by target relocation
      * handoff.
      *
+     * @param consumer      a {@link Runnable} that is executed after performSegRep
      * @param performSegRep a {@link Runnable} that is executed after operations are blocked
-     * @param consumer a {@link Runnable} that is executed after performSegRep
      * @throws IllegalIndexShardStateException if the shard is not relocating due to concurrent cancellation
      * @throws IllegalStateException           if the relocation target is no longer part of the replication group
      * @throws InterruptedException            if blocking operations is interrupted
@@ -858,7 +883,8 @@ public void relocated(
             indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
                 forceRefreshes.close();
 
-                boolean syncTranslog = isRemoteTranslogEnabled() && Durability.ASYNC == indexSettings.getTranslogDurability();
+                boolean syncTranslog = (isRemoteTranslogEnabled() || this.isMigratingToRemote())
+                    && Durability.ASYNC == indexSettings.getTranslogDurability();
                 // Since all the index permits are acquired at this point, the translog buffer will not change.
                 // It is safe to perform sync of translogs now as this will ensure for remote-backed indexes, the
                 // translogs has been uploaded to the remote store.
@@ -881,6 +907,7 @@ public void relocated(
                     : "in-flight operations in progress while moving shard state to relocated";
 
                 performSegRep.run();
+
                 /*
                  * We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context via a
                  * network operation. Doing this under the mutex can implicitly block the cluster state update thread on network operations.
@@ -1041,7 +1068,7 @@ private Engine.IndexResult applyIndexOperation(
 
         // For Segment Replication enabled replica shards we can be skip parsing the documents as we directly copy segments from primary
         // shard.
-        if (indexSettings.isSegRepEnabled() && routingEntry().primary() == false) {
+        if (indexSettings.isSegRepEnabledOrRemoteNode() && routingEntry().primary() == false) {
             Engine.Index index = new Engine.Index(
                 new Term(IdFieldMapper.NAME, Uid.encodeId(id)),
                 new ParsedDocument(null, null, id, null, null, sourceToParse.source(), sourceToParse.getMediaType(), null),
@@ -1240,7 +1267,7 @@ public Engine.DeleteResult applyDeleteOperationOnPrimary(
     }
 
     public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long opPrimaryTerm, long version, String id) throws IOException {
-        if (indexSettings.isSegRepEnabled()) {
+        if (indexSettings.isSegRepEnabledOrRemoteNode()) {
             final Engine.Delete delete = new Engine.Delete(
                 id,
                 new Term(IdFieldMapper.NAME, Uid.encodeId(id)),
@@ -1435,12 +1462,12 @@ public SegmentsStats segmentStats(boolean includeSegmentFileSizes, boolean inclu
         SegmentsStats segmentsStats = getEngine().segmentsStats(includeSegmentFileSizes, includeUnloadedSegments);
         segmentsStats.addBitsetMemoryInBytes(shardBitsetFilterCache.getMemorySizeInBytes());
         // Populate remote_store stats only if the index is remote store backed
-        if (indexSettings.isRemoteStoreEnabled()) {
+        if (indexSettings().isRemoteNode()) {
             segmentsStats.addRemoteSegmentStats(
                 new RemoteSegmentStats(remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId).stats())
             );
         }
-        if (indexSettings.isSegRepEnabled()) {
+        if (indexSettings.isSegRepEnabledOrRemoteNode()) {
             segmentsStats.addReplicationStats(getReplicationStats());
         }
         return segmentsStats;
@@ -1457,7 +1484,7 @@ public FieldDataStats fieldDataStats(String... fields) {
     public TranslogStats translogStats() {
         TranslogStats translogStats = getEngine().translogManager().getTranslogStats();
         // Populate remote_store stats only if the index is remote store backed
-        if (indexSettings.isRemoteStoreEnabled()) {
+        if (indexSettings.isRemoteNode()) {
             translogStats.addRemoteTranslogStats(
                 new RemoteTranslogStats(remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardId).stats())
             );
@@ -1496,7 +1523,7 @@ public void flush(FlushRequest request) {
      * {@link org.opensearch.index.translog.TranslogDeletionPolicy} for details
      */
     public void trimTranslog() {
-        if (isRemoteTranslogEnabled()) {
+        if (indexSettings.isRemoteNode()) {
             return;
         }
         verifyNotClosed();
@@ -1661,7 +1688,7 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() {
      *
      */
     public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegmentInfosAndCheckpoint() {
-        assert indexSettings.isSegRepEnabled();
+        assert indexSettings.isSegRepEnabledOrRemoteNode();
 
         // do not close the snapshot - caller will close it.
         GatedCloseable<SegmentInfos> snapshot = null;
@@ -1720,7 +1747,7 @@ ReplicationCheckpoint computeReplicationCheckpoint(SegmentInfos segmentInfos) th
      * @return - True if the shard is able to perform segment replication.
      */
     public boolean isSegmentReplicationAllowed() {
-        if (indexSettings.isSegRepEnabled() == false) {
+        if (indexSettings.isSegRepEnabledOrRemoteNode() == false) {
             logger.trace("Attempting to perform segment replication when it is not enabled on the index");
             return false;
         }
@@ -2016,7 +2043,7 @@ public void close(String reason, boolean flushEngine, boolean deleted) throws IO
     ToDo : Fix this https://github.com/opensearch-project/OpenSearch/issues/8003
      */
     public RemoteSegmentStoreDirectory getRemoteDirectory() {
-        assert indexSettings.isRemoteStoreEnabled();
+        assert indexSettings.isRemoteNode();
         assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory";
         FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory();
         FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate();
@@ -2028,8 +2055,8 @@ public RemoteSegmentStoreDirectory getRemoteDirectory() {
      * Returns true iff it is able to verify that remote segment store
      * is in sync with local
      */
-    boolean isRemoteSegmentStoreInSync() {
-        assert indexSettings.isRemoteStoreEnabled();
+    public boolean isRemoteSegmentStoreInSync() {
+        assert indexSettings.isRemoteNode();
         try {
             RemoteSegmentStoreDirectory directory = getRemoteDirectory();
             if (directory.readLatestMetadataFile() != null) {
@@ -2059,6 +2086,46 @@ boolean isRemoteSegmentStoreInSync() {
         return false;
     }
 
+    public void waitForRemoteStoreSync() {
+        waitForRemoteStoreSync(() -> {});
+    }
+
+    /*
+    Blocks the calling thread,  waiting for the remote store to get synced till internal Remote Upload Timeout
+    Calls onProgress on seeing an increased file count on remote
+    */
+    public void waitForRemoteStoreSync(Runnable onProgress) {
+        assert indexSettings.isRemoteNode();
+        RemoteSegmentStoreDirectory directory = getRemoteDirectory();
+        int segmentUploadeCount = 0;
+        if (shardRouting.primary() == false) {
+            return;
+        }
+        long startNanos = System.nanoTime();
+
+        while (System.nanoTime() - startNanos < getRecoverySettings().internalRemoteUploadTimeout().nanos()) {
+            try {
+                if (isRemoteSegmentStoreInSync()) {
+                    break;
+                } else {
+                    if (directory.getSegmentsUploadedToRemoteStore().size() > segmentUploadeCount) {
+                        onProgress.run();
+                        logger.debug("Uploaded segment count {}", directory.getSegmentsUploadedToRemoteStore().size());
+                        segmentUploadeCount = directory.getSegmentsUploadedToRemoteStore().size();
+                    }
+                    try {
+                        Thread.sleep(TimeValue.timeValueSeconds(30).seconds());
+                    } catch (InterruptedException ie) {
+                        throw new OpenSearchException("Interrupted waiting for completion of [{}]", ie);
+                    }
+                }
+            } catch (AlreadyClosedException e) {
+                // There is no point in waiting as shard is now closed .
+                return;
+            }
+        }
+    }
+
     public void preRecovery() {
         final IndexShardState currentState = this.state; // single volatile read
         if (currentState == IndexShardState.CLOSED) {
@@ -2203,7 +2270,7 @@ public long recoverLocallyAndFetchStartSeqNo(boolean localTranslog) {
      * @return the starting sequence number from which the recovery should start.
      */
     private long recoverLocallyUptoLastCommit() {
-        assert isRemoteTranslogEnabled() : "Remote translog store is not enabled";
+        assert indexSettings.isRemoteNode() : "Remote translog store is not enabled";
         long seqNo;
         validateLocalRecoveryState();
 
@@ -2449,7 +2516,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b
         synchronized (engineMutex) {
             assert currentEngineReference.get() == null : "engine is running";
             verifyNotClosed();
-            if (indexSettings.isRemoteStoreEnabled()) {
+            if (indexSettings.isRemoteStoreEnabled() || this.isRemoteSeeded()) {
                 // Download missing segments from remote segment store.
                 if (syncFromRemote) {
                     syncSegmentsFromRemoteSegmentStore(false);
@@ -2488,7 +2555,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b
             onNewEngine(newEngine);
             currentEngineReference.set(newEngine);
 
-            if (indexSettings.isSegRepEnabled()) {
+            if (indexSettings.isSegRepEnabledOrRemoteNode()) {
                 // set initial replication checkpoints into tracker.
                 updateReplicationCheckpoint();
             }
@@ -2900,7 +2967,7 @@ public Translog.Snapshot getHistoryOperations(String reason, long startingSeqNo,
      * This method should only be invoked if Segment Replication or Remote Store is not enabled.
      */
     public Translog.Snapshot getHistoryOperationsFromTranslog(long startingSeqNo, long endSeqNo) throws IOException {
-        assert (indexSettings.isSegRepEnabled() || indexSettings.isRemoteStoreEnabled()) == false
+        assert indexSettings.isSegRepEnabledOrRemoteNode() == false
             : "unsupported operation for segment replication enabled indices or remote store backed indices";
         return getEngine().translogManager().newChangesSnapshot(startingSeqNo, endSeqNo, true);
     }
@@ -3067,7 +3134,7 @@ public Set<SegmentReplicationShardStats> getReplicationStatsForTrackedReplicas()
     }
 
     public ReplicationStats getReplicationStats() {
-        if (indexSettings.isSegRepEnabled() && routingEntry().primary()) {
+        if (indexSettings.isSegRepEnabledOrRemoteNode() && routingEntry().primary()) {
             final Set<SegmentReplicationShardStats> stats = getReplicationStatsForTrackedReplicas();
             long maxBytesBehind = stats.stream().mapToLong(SegmentReplicationShardStats::getBytesBehindCount).max().orElse(0L);
             long totalBytesBehind = stats.stream().mapToLong(SegmentReplicationShardStats::getBytesBehindCount).sum();
@@ -3446,7 +3513,14 @@ public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext p
             + "] does not contain relocation target ["
             + routingEntry()
             + "]";
-        assert getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint()
+        String allocationId = routingEntry().allocationId().getId();
+        if (isRemoteStoreEnabled() || isMigratingToRemote()) {
+            // For remote backed indexes, old primary may not have updated value of local checkpoint of new primary.
+            // But the new primary is always updated with data in remote sore and is at par with old primary.
+            // So, we can use a stricter check where local checkpoint of new primary is checked against that of old primary.
+            allocationId = primaryContext.getRoutingTable().primaryShard().allocationId().getId();
+        }
+        assert getLocalCheckpoint() == primaryContext.getCheckpointStates().get(allocationId).getLocalCheckpoint()
             || indexSettings().getTranslogDurability() == Durability.ASYNC : "local checkpoint ["
                 + getLocalCheckpoint()
                 + "] does not match checkpoint from primary context ["
@@ -3459,7 +3533,7 @@ assert getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingE
     }
 
     private void postActivatePrimaryMode() {
-        if (indexSettings.isRemoteStoreEnabled()) {
+        if (indexSettings.isRemoteNode()) {
             // We make sure to upload translog (even if it does not contain any operations) to remote translog.
             // This helps to get a consistent state in remote store where both remote segment store and remote
             // translog contains data.
@@ -3846,14 +3920,14 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
 
         internalRefreshListener.clear();
         internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
-        if (indexSettings.isSegRepEnabled()) {
+        if (indexSettings.isSegRepEnabledOrRemoteNode()) {
             internalRefreshListener.add(new ReplicationCheckpointUpdater());
         }
         if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) {
             internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
         }
 
-        if (isRemoteStoreEnabled()) {
+        if (isRemoteStoreEnabled() || isMigratingToRemote()) {
             internalRefreshListener.add(
                 new RemoteStoreRefreshListener(
                     this,
@@ -3867,10 +3941,15 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
           With segment replication enabled for primary relocation, recover replica shard initially as read only and
           change to a writeable engine during relocation handoff after a round of segment replication.
          */
-        boolean isReadOnlyReplica = indexSettings.isSegRepEnabled()
+        boolean isReadOnlyReplica = indexSettings.isSegRepEnabledOrRemoteNode()
             && (shardRouting.primary() == false
                 || (shardRouting.isRelocationTarget() && recoveryState.getStage() != RecoveryState.Stage.FINALIZE));
 
+        // For mixed mode, when relocating from doc rep to remote node, we use a writeable engine
+        if (shouldSeedRemoteStore()) {
+            isReadOnlyReplica = false;
+        }
+
         return this.engineConfigFactory.newEngineConfig(
             shardId,
             threadPool,
@@ -3895,7 +3974,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
             this::getOperationPrimaryTerm,
             tombstoneDocSupplier(),
             isReadOnlyReplica,
-            this::isStartedPrimary,
+            this::enableUploadToRemoteTranslog,
             translogFactorySupplier.apply(indexSettings, shardRouting),
             isTimeSeriesDescSortOptimizationEnabled() ? DataStream.TIMESERIES_LEAF_SORTER : null // DESC @timestamp default order for
             // timeseries
@@ -3916,7 +3995,24 @@ public boolean isRemoteTranslogEnabled() {
      * translog uploads.
      */
     public boolean isStartedPrimary() {
-        return getReplicationTracker().isPrimaryMode() && state() == IndexShardState.STARTED;
+        return (getReplicationTracker().isPrimaryMode() && state() == IndexShardState.STARTED);
+    }
+
+    public boolean enableUploadToRemoteTranslog() {
+        return isStartedPrimary() || (shouldSeedRemoteStore() && hasOneRemoteSegmentSyncHappened());
+    }
+
+    private boolean hasOneRemoteSegmentSyncHappened() {
+        assert indexSettings.isRemoteNode();
+        // We upload remote translog only after one remote segment upload in case of migration
+        RemoteSegmentStoreDirectory rd = getRemoteDirectory();
+        AtomicBoolean segment_n_uploaded = new AtomicBoolean(false);
+        rd.getSegmentsUploadedToRemoteStore().forEach((key, value) -> {
+            if (key.startsWith("segments")) {
+                segment_n_uploaded.set(true);
+            }
+        });
+        return segment_n_uploaded.get();
     }
 
     /**
@@ -4229,7 +4325,7 @@ private void innerAcquireReplicaOperationPermit(
                         );
                         // With Segment Replication enabled, we never want to reset a replica's engine unless
                         // it is promoted to primary.
-                        if (currentGlobalCheckpoint < maxSeqNo && indexSettings.isSegRepEnabled() == false) {
+                        if (currentGlobalCheckpoint < maxSeqNo && indexSettings.isSegRepEnabledOrRemoteNode() == false) {
                             resetEngineToGlobalCheckpoint();
                         } else {
                             getEngine().translogManager().rollTranslogGeneration();
@@ -4521,10 +4617,10 @@ public final boolean isSearchIdle() {
     public final boolean isSearchIdleSupported() {
         // If the index is remote store backed, then search idle is not supported. This is to ensure that async refresh
         // task continues to upload to remote store periodically.
-        if (isRemoteTranslogEnabled()) {
+        if (isRemoteTranslogEnabled() || indexSettings.isRemoteNode()) {
             return false;
         }
-        return indexSettings.isSegRepEnabled() == false || indexSettings.getNumberOfReplicas() == 0;
+        return indexSettings.isSegRepEnabledOrRemoteNode() == false || indexSettings.getNumberOfReplicas() == 0;
     }
 
     /**
@@ -4786,10 +4882,10 @@ public void close() throws IOException {
                 }
             };
             IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
-            if (indexSettings.isRemoteStoreEnabled()) {
+            if (indexSettings.isRemoteStoreEnabled() || this.isRemoteSeeded()) {
                 syncSegmentsFromRemoteSegmentStore(false);
             }
-            if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
+            if ((indexSettings.isRemoteTranslogStoreEnabled() || this.isRemoteSeeded()) && shardRouting.primary()) {
                 syncRemoteTranslogAndUpdateGlobalCheckpoint();
             }
             newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
@@ -4808,7 +4904,9 @@ public void close() throws IOException {
         // of truth for translog, we play all translogs that exists locally. Otherwise, the recoverUpto happens upto global checkpoint.
         // We also replay all local translog ops with Segment replication, because on engine swap our local translog may
         // hold more ops than the global checkpoint.
-        long recoverUpto = this.isRemoteTranslogEnabled() || indexSettings().isSegRepEnabled() ? Long.MAX_VALUE : globalCheckpoint;
+        long recoverUpto = this.isRemoteTranslogEnabled() || indexSettings().isSegRepEnabledOrRemoteNode()
+            ? Long.MAX_VALUE
+            : globalCheckpoint;
         newEngineReference.get()
             .translogManager()
             .recoverFromTranslog(translogRunner, newEngineReference.get().getProcessedLocalCheckpoint(), recoverUpto);
@@ -4837,6 +4935,16 @@ public void deleteTranslogFilesFromRemoteTranslog() throws IOException {
         RemoteFsTranslog.cleanup(repository, shardId, getThreadPool());
     }
 
+    /*
+    Cleans up remote store and remote translog contents.
+    This is used in remote store migration, where we want to clean up all stale segment and translog data
+    and seed the remote store afresh
+     */
+    public void deleteRemoteStoreContents() throws IOException {
+        deleteTranslogFilesFromRemoteTranslog();
+        getRemoteDirectory().deleteStaleSegments(0);
+    }
+
     public void syncTranslogFilesFromRemoteTranslog() throws IOException {
         TranslogFactory translogFactory = translogFactorySupplier.apply(indexSettings, shardRouting);
         assert translogFactory instanceof RemoteBlobStoreInternalTranslogFactory;
@@ -4862,7 +4970,7 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOE
     public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runnable onFileSync) throws IOException {
         boolean syncSegmentSuccess = false;
         long startTimeMs = System.currentTimeMillis();
-        assert indexSettings.isRemoteStoreEnabled();
+        assert indexSettings.isRemoteStoreEnabled() || this.isRemoteSeeded();
         logger.trace("Downloading segments from remote segment store");
         RemoteSegmentStoreDirectory remoteDirectory = getRemoteDirectory();
         // We need to call RemoteSegmentStoreDirectory.init() in order to get latest metadata of the files that
@@ -5127,4 +5235,20 @@ private TimeValue getRemoteTranslogUploadBufferInterval(Supplier<TimeValue> clus
     public AsyncIOProcessor<Translog.Location> getTranslogSyncProcessor() {
         return translogSyncProcessor;
     }
+
+    enum ShardMigrationState {
+        REMOTE_NON_MIGRATING,
+        REMOTE_MIGRATING_SEEDED,
+        REMOTE_MIGRATING_UNSEEDED,
+        DOCREP_NON_MIGRATING
+    }
+
+    static ShardMigrationState getShardMigrationState(IndexSettings indexSettings, boolean shouldSeed) {
+        if (indexSettings.isRemoteNode() && indexSettings.isRemoteStoreEnabled()) {
+            return REMOTE_NON_MIGRATING;
+        } else if (indexSettings.isRemoteNode()) {
+            return shouldSeed ? REMOTE_MIGRATING_UNSEEDED : REMOTE_MIGRATING_SEEDED;
+        }
+        return ShardMigrationState.DOCREP_NON_MIGRATING;
+    }
 }
diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java
index 7bb80b736693f..fb96102bc6094 100644
--- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java
+++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java
@@ -528,7 +528,7 @@ private void initializeRemoteDirectoryOnTermUpdate() throws IOException {
      * @return true iff the shard is a started with primary mode true or it is local or snapshot recovery.
      */
     private boolean isReadyForUpload() {
-        boolean isReady = indexShard.isStartedPrimary() || isLocalOrSnapshotRecovery();
+        boolean isReady = indexShard.isStartedPrimary() || isLocalOrSnapshotRecovery() || indexShard.shouldSeedRemoteStore();
 
         if (isReady == false) {
             StringBuilder sb = new StringBuilder("Skipped syncing segments with");
diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java
index 3faef2da05320..5f09b1a0802f3 100644
--- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java
+++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java
@@ -38,13 +38,11 @@
 import org.apache.lucene.index.NoMergePolicy;
 import org.apache.lucene.index.SegmentInfos;
 import org.apache.lucene.search.Sort;
-import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FilterDirectory;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
 import org.opensearch.ExceptionsHelper;
-import org.opensearch.OpenSearchException;
 import org.opensearch.action.StepListener;
 import org.opensearch.cluster.metadata.IndexMetadata;
 import org.opensearch.cluster.metadata.MappingMetadata;
@@ -194,7 +192,7 @@ void recoverFromLocalShards(
                     // copied segments - we will also see them in stats etc.
                     indexShard.getEngine().forceMerge(false, -1, false, false, false, UUIDs.randomBase64UUID());
                     if (indexShard.isRemoteTranslogEnabled() && indexShard.shardRouting.primary()) {
-                        waitForRemoteStoreSync(indexShard);
+                        indexShard.waitForRemoteStoreSync();
                         if (indexShard.isRemoteSegmentStoreInSync() == false) {
                             throw new IndexShardRecoveryException(
                                 indexShard.shardId(),
@@ -436,7 +434,7 @@ void recoverFromSnapshotAndRemoteStore(
                 indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
                 indexShard.finalizeRecovery();
                 if (indexShard.isRemoteTranslogEnabled() && indexShard.shardRouting.primary()) {
-                    waitForRemoteStoreSync(indexShard);
+                    indexShard.waitForRemoteStoreSync();
                     if (indexShard.isRemoteSegmentStoreInSync() == false) {
                         listener.onFailure(new IndexShardRestoreFailedException(shardId, "Failed to upload to remote segment store"));
                         return;
@@ -722,7 +720,7 @@ private void restore(
             indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
             indexShard.finalizeRecovery();
             if (indexShard.isRemoteTranslogEnabled() && indexShard.shardRouting.primary()) {
-                waitForRemoteStoreSync(indexShard);
+                indexShard.waitForRemoteStoreSync();
                 if (indexShard.isRemoteSegmentStoreInSync() == false) {
                     listener.onFailure(new IndexShardRestoreFailedException(shardId, "Failed to upload to remote segment store"));
                     return;
@@ -796,31 +794,4 @@ private void bootstrap(final IndexShard indexShard, final Store store) throws IO
         );
         store.associateIndexWithNewTranslog(translogUUID);
     }
-
-    /*
-    Blocks the calling thread,  waiting for the remote store to get synced till internal Remote Upload Timeout
-     */
-    private void waitForRemoteStoreSync(IndexShard indexShard) {
-        if (indexShard.shardRouting.primary() == false) {
-            return;
-        }
-        long startNanos = System.nanoTime();
-
-        while (System.nanoTime() - startNanos < indexShard.getRecoverySettings().internalRemoteUploadTimeout().nanos()) {
-            try {
-                if (indexShard.isRemoteSegmentStoreInSync()) {
-                    break;
-                } else {
-                    try {
-                        Thread.sleep(TimeValue.timeValueMinutes(1).seconds());
-                    } catch (InterruptedException ie) {
-                        throw new OpenSearchException("Interrupted waiting for completion of [{}]", ie);
-                    }
-                }
-            } catch (AlreadyClosedException e) {
-                // There is no point in waiting as shard is now closed .
-                return;
-            }
-        }
-    }
 }
diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java
index 1930a37daa400..0992d86d6f0aa 100644
--- a/server/src/main/java/org/opensearch/index/store/Store.java
+++ b/server/src/main/java/org/opensearch/index/store/Store.java
@@ -385,7 +385,7 @@ public MetadataSnapshot getMetadata(SegmentInfos segmentInfos) throws IOExceptio
      * @return {@link Map} map file name to {@link StoreFileMetadata}.
      */
     public Map<String, StoreFileMetadata> getSegmentMetadataMap(SegmentInfos segmentInfos) throws IOException {
-        assert indexSettings.isSegRepEnabled();
+        assert indexSettings.isSegRepEnabledOrRemoteNode();
         failIfCorrupted();
         try {
             return loadMetadata(segmentInfos, directory, logger, true).fileMetadata;
@@ -893,7 +893,7 @@ public void beforeClose() {
      * @throws IOException when there is an IO error committing.
      */
     public void commitSegmentInfos(SegmentInfos latestSegmentInfos, long maxSeqNo, long processedCheckpoint) throws IOException {
-        assert indexSettings.isSegRepEnabled();
+        assert indexSettings.isSegRepEnabledOrRemoteNode() || indexSettings.isRemoteNode();
         metadataLock.writeLock().lock();
         try {
             final Map<String, String> userData = new HashMap<>(latestSegmentInfos.getUserData());
diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java
index 7b969a37e4aa6..43eec01b2d365 100644
--- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java
+++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java
@@ -334,7 +334,7 @@ private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOExc
     }
 
     private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws IOException {
-        logger.trace("uploading translog for {} {}", primaryTerm, generation);
+        logger.trace("uploading translog for primary term {} generation {}", primaryTerm, generation);
         try (
             TranslogCheckpointTransferSnapshot transferSnapshotProvider = new TranslogCheckpointTransferSnapshot.Builder(
                 primaryTerm,
diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java
index 9f877e87415dd..e78300e368099 100644
--- a/server/src/main/java/org/opensearch/index/translog/Translog.java
+++ b/server/src/main/java/org/opensearch/index/translog/Translog.java
@@ -525,7 +525,7 @@ TranslogWriter createWriter(
                 tragedy,
                 persistedSequenceNumberConsumer,
                 bigArrays,
-                indexSettings.isRemoteTranslogStoreEnabled()
+                indexSettings.isRemoteNode()
             );
         } catch (final IOException e) {
             throw new TranslogException(shardId, "failed to create new translog file", e);
diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java
index 0e64894e6f708..9bc81c1826c2d 100644
--- a/server/src/main/java/org/opensearch/indices/IndicesService.java
+++ b/server/src/main/java/org/opensearch/indices/IndicesService.java
@@ -150,6 +150,7 @@
 import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
 import org.opensearch.indices.replication.common.ReplicationType;
 import org.opensearch.node.Node;
+import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
 import org.opensearch.plugins.IndexStorePlugin;
 import org.opensearch.plugins.PluginsService;
 import org.opensearch.repositories.RepositoriesService;
@@ -201,6 +202,7 @@
 import static org.opensearch.index.IndexService.IndexCreationContext.CREATE_INDEX;
 import static org.opensearch.index.IndexService.IndexCreationContext.METADATA_VERIFICATION;
 import static org.opensearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder;
+import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent;
 import static org.opensearch.search.SearchService.ALLOW_EXPENSIVE_QUERIES;
 
 /**
@@ -503,7 +505,12 @@ protected void closeInternal() {
         this.allowExpensiveQueries = ALLOW_EXPENSIVE_QUERIES.get(clusterService.getSettings());
         clusterService.getClusterSettings().addSettingsUpdateConsumer(ALLOW_EXPENSIVE_QUERIES, this::setAllowExpensiveQueries);
         this.remoteDirectoryFactory = remoteDirectoryFactory;
-        this.translogFactorySupplier = getTranslogFactorySupplier(repositoriesServiceSupplier, threadPool, remoteStoreStatsTrackerFactory);
+        this.translogFactorySupplier = getTranslogFactorySupplier(
+            repositoriesServiceSupplier,
+            threadPool,
+            remoteStoreStatsTrackerFactory,
+            settings
+        );
         this.searchRequestStats = searchRequestStats;
         this.clusterDefaultRefreshInterval = CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.get(clusterService.getSettings());
         clusterService.getClusterSettings()
@@ -533,7 +540,8 @@ private void onRefreshIntervalUpdate(TimeValue clusterDefaultRefreshInterval) {
     private static BiFunction<IndexSettings, ShardRouting, TranslogFactory> getTranslogFactorySupplier(
         Supplier<RepositoriesService> repositoriesServiceSupplier,
         ThreadPool threadPool,
-        RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory
+        RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
+        Settings settings
     ) {
         return (indexSettings, shardRouting) -> {
             if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
@@ -543,6 +551,13 @@ private static BiFunction<IndexSettings, ShardRouting, TranslogFactory> getTrans
                     indexSettings.getRemoteStoreTranslogRepository(),
                     remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId())
                 );
+            } else if (isRemoteDataAttributePresent(settings) && shardRouting.primary()) {
+                return new RemoteBlobStoreInternalTranslogFactory(
+                    repositoriesServiceSupplier,
+                    threadPool,
+                    RemoteStoreNodeAttribute.getRemoteStoreTranslogRepo(indexSettings.getNodeSettings()),
+                    remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId())
+                );
             }
             return new InternalTranslogFactory();
         };
@@ -932,7 +947,7 @@ private EngineFactory getEngineFactory(final IndexSettings idxSettings) {
             if (idxSettings.isRemoteSnapshot()) {
                 return config -> new ReadOnlyEngine(config, new SeqNoStats(0, 0, 0), new TranslogStats(), true, Function.identity(), false);
             }
-            if (idxSettings.isSegRepEnabled()) {
+            if (idxSettings.isSegRepEnabledOrRemoteNode() || idxSettings.isRemoteNode()) {
                 return new NRTReplicationEngineFactory();
             }
             return new InternalEngineFactory();
@@ -1032,7 +1047,10 @@ public IndexShard createShard(
             globalCheckpointSyncer,
             retentionLeaseSyncer,
             checkpointPublisher,
-            remoteStoreStatsTrackerFactory
+            remoteStoreStatsTrackerFactory,
+            repositoriesService,
+            targetNode,
+            sourceNode
         );
         indexShard.addShardFailureCallback(onShardFailure);
         indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> {
diff --git a/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java b/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java
index 29ee097d36cac..fac6924435cf3 100644
--- a/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java
+++ b/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java
@@ -161,7 +161,7 @@ private void innerWriteFileChunk(StoreFileMetadata fileMetadata, long position,
                 + "] in "
                 + Arrays.toString(store.directory().listAll());
             // With Segment Replication, we will fsync after a full commit has been received.
-            if (store.indexSettings().isSegRepEnabled() == false) {
+            if (store.indexSettings().isSegRepEnabledOrRemoteNode() == false) {
                 store.directory().sync(Collections.singleton(temporaryFileName));
             }
             IndexOutput remove = removeOpenIndexOutputs(name);
diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoverySourceService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoverySourceService.java
index cb2bedf00de99..30f517fda9931 100644
--- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoverySourceService.java
+++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoverySourceService.java
@@ -377,7 +377,7 @@ private Tuple<RecoverySourceHandler, RemoteRecoveryTargetHandler> createRecovery
                     request.targetNode(),
                     recoverySettings,
                     throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime),
-                    shard.isRemoteTranslogEnabled()
+                    shard.isRemoteTranslogEnabled() || request.targetNode().isRemoteStoreNode()
                 );
                 handler = RecoverySourceHandlerFactory.create(shard, recoveryTarget, request, recoverySettings);
                 return Tuple.tuple(handler, recoveryTarget);
diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java
index 4232d32987e86..227496f72f83d 100644
--- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java
+++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java
@@ -189,7 +189,7 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh
     public void startRecovery(final IndexShard indexShard, final DiscoveryNode sourceNode, final RecoveryListener listener) {
         // create a new recovery status, and process...
         final long recoveryId = onGoingRecoveries.start(
-            new RecoveryTarget(indexShard, sourceNode, listener),
+            new RecoveryTarget(indexShard, sourceNode, listener, threadPool),
             recoverySettings.activityTimeout()
         );
         // we fork off quickly here and go async but this is called from the cluster state applier thread too and that can cause
@@ -246,7 +246,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
                     logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
                     indexShard.prepareForIndexRecovery();
                     final boolean hasRemoteSegmentStore = indexShard.indexSettings().isRemoteStoreEnabled();
-                    if (hasRemoteSegmentStore) {
+                    if (hasRemoteSegmentStore || indexShard.isRemoteSeeded()) {
                         // ToDo: This is a temporary mitigation to not fail the peer recovery flow in case there is
                         // an exception while downloading segments from remote store. For remote backed indexes, we
                         // plan to revamp this flow so that node-node segment copy will not happen.
@@ -260,7 +260,8 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
                             );
                         }
                     }
-                    final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled();
+                    final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false
+                        && indexShard.indexSettings().isRemoteNode();
                     final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot();
                     final boolean verifyTranslog = (hasRemoteTranslog || hasNoTranslog || hasRemoteSegmentStore) == false;
                     final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(!hasRemoteTranslog);
diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java
index 7996c48b2b04b..abf9b1aaeb2cc 100644
--- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java
+++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java
@@ -841,9 +841,11 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis
 
             if (request.isPrimaryRelocation()) {
                 logger.trace("performing relocation hand-off");
-                final Runnable forceSegRepRunnable = shard.indexSettings().isSegRepEnabled()
-                    ? recoveryTarget::forceSegmentFileSync
-                    : () -> {};
+                final Runnable forceSegRepRunnable = shard.indexSettings().isSegRepEnabledOrRemoteNode()
+                    || (request.sourceNode().isRemoteStoreNode() && request.targetNode().isRemoteStoreNode())
+                        ? recoveryTarget::forceSegmentFileSync
+                        : () -> {};
+
                 // TODO: make relocated async
                 // this acquires all IndexShard operation permits and will thus delay new recoveries until it is done
                 cancellableThreads.execute(
@@ -855,7 +857,7 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis
                  */
             } else {
                 // Force round of segment replication to update its checkpoint to primary's
-                if (shard.indexSettings().isSegRepEnabled()) {
+                if (shard.indexSettings().isSegRepEnabledOrRemoteNode()) {
                     cancellableThreads.execute(recoveryTarget::forceSegmentFileSync);
                 }
             }
diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandlerFactory.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandlerFactory.java
index ea13ca18bbfca..0ccb1ac2133cf 100644
--- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandlerFactory.java
+++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandlerFactory.java
@@ -23,7 +23,8 @@ public static RecoverySourceHandler create(
         StartRecoveryRequest request,
         RecoverySettings recoverySettings
     ) {
-        boolean isReplicaRecoveryWithRemoteTranslog = request.isPrimaryRelocation() == false && shard.isRemoteTranslogEnabled();
+        boolean isReplicaRecoveryWithRemoteTranslog = request.isPrimaryRelocation() == false
+            && (shard.isRemoteTranslogEnabled() || shard.isMigratingToRemote());
         if (isReplicaRecoveryWithRemoteTranslog) {
             return new RemoteStorePeerRecoverySourceHandler(
                 shard,
diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java
index f3b5d0d790f83..16311d5d2cfb7 100644
--- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java
+++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java
@@ -61,6 +61,7 @@
 import org.opensearch.indices.replication.common.ReplicationListener;
 import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
 import org.opensearch.indices.replication.common.ReplicationTarget;
+import org.opensearch.threadpool.ThreadPool;
 
 import java.io.IOException;
 import java.nio.channels.FileChannel;
@@ -87,16 +88,20 @@ public class RecoveryTarget extends ReplicationTarget implements RecoveryTargetH
     // latch that can be used to blockingly wait for RecoveryTarget to be closed
     private final CountDownLatch closedLatch = new CountDownLatch(1);
 
+    private final ThreadPool threadPool;
+
     /**
      * Creates a new recovery target object that represents a recovery to the provided shard.
      *
-     * @param indexShard                        local shard where we want to recover to
-     * @param sourceNode                        source node of the recovery where we recover from
-     * @param listener                          called when recovery is completed/failed
+     * @param indexShard local shard where we want to recover to
+     * @param sourceNode source node of the recovery where we recover from
+     * @param listener   called when recovery is completed/failed
+     * @param threadPool threadpool instance
      */
-    public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, ReplicationListener listener) {
+    public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, ReplicationListener listener, ThreadPool threadPool) {
         super("recovery_status", indexShard, indexShard.recoveryState().getIndex(), listener);
         this.sourceNode = sourceNode;
+        this.threadPool = threadPool;
         indexShard.recoveryStats().incCurrentAsTarget();
         final String tempFilePrefix = getPrefix() + UUIDs.randomBase64UUID() + ".";
         this.multiFileWriter = new MultiFileWriter(indexShard.store(), stateIndex, tempFilePrefix, logger, this::ensureRefCount);
@@ -108,7 +113,7 @@ public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, Replicati
      * @return a copy of this recovery target
      */
     public RecoveryTarget retryCopy() {
-        return new RecoveryTarget(indexShard, sourceNode, listener);
+        return new RecoveryTarget(indexShard, sourceNode, listener, threadPool);
     }
 
     public String source() {
@@ -209,6 +214,15 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener<Vo
             state().getIndex().setFileDetailsComplete(); // ops-based recoveries don't send the file details
             state().getTranslog().totalOperations(totalTranslogOps);
             indexShard().openEngineAndSkipTranslogRecovery();
+            // upload to remote store in migration for primary shard
+            if (indexShard.shouldSeedRemoteStore() && indexShard.routingEntry().primary()) {
+                indexShard.deleteRemoteStoreContents();
+                // This cleans up remote translog's 0 generation, as we don't want to get that uploaded
+                indexShard.sync();
+                threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { indexShard.refresh("remote store migration"); });
+                indexShard.waitForRemoteStoreSync(this::setLastAccessTime);
+                logger.info("Remote Store is now seeded for {}", indexShard.shardId());
+            }
             return null;
         });
     }
@@ -360,7 +374,7 @@ public void cleanFiles(
                 // Replicas for segment replication or remote snapshot indices do not create
                 // their own commit points and therefore do not modify the commit user data
                 // in their store. In these cases, reuse the primary's translog UUID.
-                final boolean reuseTranslogUUID = indexShard.indexSettings().isSegRepEnabled()
+                final boolean reuseTranslogUUID = indexShard.indexSettings().isSegRepEnabledOrRemoteNode()
                     || indexShard.indexSettings().isRemoteSnapshot();
                 if (reuseTranslogUUID) {
                     final String translogUUID = store.getMetadata().getCommitUserData().get(TRANSLOG_UUID_KEY);
diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java
index 4062f9702fb3a..a393faabae0ea 100644
--- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java
+++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java
@@ -175,7 +175,7 @@ public void clusterChanged(ClusterChangedEvent event) {
         // we need to ensure its state has cleared up in ongoing replications.
         if (event.routingTableChanged()) {
             for (IndexService indexService : indicesService) {
-                if (indexService.getIndexSettings().isSegRepEnabled()) {
+                if (indexService.getIndexSettings().isSegRepEnabledOrRemoteNode()) {
                     for (IndexShard indexShard : indexService) {
                         if (indexShard.routingEntry().primary()) {
                             final IndexMetadata indexMetadata = indexService.getIndexSettings().getIndexMetadata();
@@ -221,7 +221,7 @@ protected void doClose() throws IOException {
      */
     @Override
     public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {
-        if (indexShard != null && indexShard.indexSettings().isSegRepEnabled()) {
+        if (indexShard != null && indexShard.indexSettings().isSegRepEnabledOrRemoteNode()) {
             ongoingSegmentReplications.cancel(indexShard, "shard is closed");
         }
     }
@@ -231,7 +231,10 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh
      */
     @Override
     public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting oldRouting, ShardRouting newRouting) {
-        if (indexShard != null && indexShard.indexSettings().isSegRepEnabled() && oldRouting.primary() == false && newRouting.primary()) {
+        if (indexShard != null
+            && indexShard.indexSettings().isSegRepEnabledOrRemoteNode()
+            && oldRouting.primary() == false
+            && newRouting.primary()) {
             ongoingSegmentReplications.cancel(indexShard.routingEntry().allocationId().getId(), "Relocating primary shard.");
         }
     }
diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java
index f28f829545d59..4942d39cfa48a 100644
--- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java
+++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java
@@ -168,7 +168,8 @@ protected void doClose() throws IOException {
     public void clusterChanged(ClusterChangedEvent event) {
         if (event.routingTableChanged()) {
             for (IndexService indexService : indicesService) {
-                if (indexService.getIndexSettings().isSegRepEnabled() && event.indexRoutingTableChanged(indexService.index().getName())) {
+                if (indexService.getIndexSettings().isSegRepEnabledOrRemoteNode()
+                    && event.indexRoutingTableChanged(indexService.index().getName())) {
                     for (IndexShard shard : indexService) {
                         if (shard.routingEntry().primary() == false) {
                             // for this shard look up its primary routing, if it has completed a relocation trigger replication
@@ -197,7 +198,7 @@ public void clusterChanged(ClusterChangedEvent event) {
      */
     @Override
     public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {
-        if (indexShard != null && indexShard.indexSettings().isSegRepEnabled()) {
+        if (indexShard != null && indexShard.indexSettings().isSegRepEnabledOrRemoteNode()) {
             onGoingReplications.cancelForShard(indexShard.shardId(), "Shard closing");
             latestReceivedCheckpoint.remove(shardId);
         }
@@ -209,7 +210,7 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh
      */
     @Override
     public void afterIndexShardStarted(IndexShard indexShard) {
-        if (indexShard.indexSettings().isSegRepEnabled() && indexShard.routingEntry().primary() == false) {
+        if (indexShard.indexSettings().isSegRepEnabledOrRemoteNode() && indexShard.routingEntry().primary() == false) {
             processLatestReceivedCheckpoint(indexShard, Thread.currentThread());
         }
     }
@@ -219,7 +220,10 @@ public void afterIndexShardStarted(IndexShard indexShard) {
      */
     @Override
     public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting oldRouting, ShardRouting newRouting) {
-        if (oldRouting != null && indexShard.indexSettings().isSegRepEnabled() && oldRouting.primary() == false && newRouting.primary()) {
+        if (oldRouting != null
+            && indexShard.indexSettings().isSegRepEnabledOrRemoteNode()
+            && oldRouting.primary() == false
+            && newRouting.primary()) {
             onGoingReplications.cancelForShard(indexShard.shardId(), "Shard has been promoted to primary");
             latestReceivedCheckpoint.remove(indexShard.shardId());
         }
diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java
index 7575c6ff5fb34..a3bfe1195d8cc 100644
--- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java
+++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java
@@ -168,6 +168,14 @@ public static boolean isRemoteClusterStateAttributePresent(Settings settings) {
             .isEmpty() == false;
     }
 
+    public static String getRemoteStoreSegmentRepo(Settings settings) {
+        return settings.get(Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY);
+    }
+
+    public static String getRemoteStoreTranslogRepo(Settings settings) {
+        return settings.get(Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY);
+    }
+
     public static boolean isRemoteStoreClusterStateEnabled(Settings settings) {
         return RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings)
             && isRemoteClusterStateAttributePresent(settings);
diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java
index 1c25d8c71f948..89f1ea142336e 100644
--- a/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java
+++ b/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java
@@ -381,7 +381,7 @@ private void snapshot(
             if (indexShard.routingEntry().primary() == false) {
                 throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary");
             }
-            if (indexShard.indexSettings().isSegRepEnabled() && indexShard.isPrimaryMode() == false) {
+            if (indexShard.indexSettings().isSegRepEnabledOrRemoteNode() && indexShard.isPrimaryMode() == false) {
                 throw new IndexShardSnapshotFailedException(
                     shardId,
                     "snapshot triggered on a new primary following failover and cannot proceed until promotion is complete"
diff --git a/server/src/test/java/org/opensearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/opensearch/index/replication/IndexLevelReplicationTests.java
index 33e08a482b9c3..ec1600094084a 100644
--- a/server/src/test/java/org/opensearch/index/replication/IndexLevelReplicationTests.java
+++ b/server/src/test/java/org/opensearch/index/replication/IndexLevelReplicationTests.java
@@ -142,7 +142,7 @@ public void run() {
             IndexShard replica = shards.addReplica();
             Future<Void> future = shards.asyncRecoverReplica(
                 replica,
-                (indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener) {
+                (indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener, threadPool) {
                     @Override
                     public void cleanFiles(
                         int totalTranslogOps,
@@ -223,17 +223,20 @@ public IndexResult index(Index op) throws IOException {
             });
             thread.start();
             IndexShard replica = shards.addReplica();
-            Future<Void> fut = shards.asyncRecoverReplica(replica, (shard, node) -> new RecoveryTarget(shard, node, recoveryListener) {
-                @Override
-                public void prepareForTranslogOperations(int totalTranslogOps, ActionListener<Void> listener) {
-                    try {
-                        indexedOnPrimary.await();
-                    } catch (InterruptedException e) {
-                        throw new AssertionError(e);
+            Future<Void> fut = shards.asyncRecoverReplica(
+                replica,
+                (shard, node) -> new RecoveryTarget(shard, node, recoveryListener, threadPool) {
+                    @Override
+                    public void prepareForTranslogOperations(int totalTranslogOps, ActionListener<Void> listener) {
+                        try {
+                            indexedOnPrimary.await();
+                        } catch (InterruptedException e) {
+                            throw new AssertionError(e);
+                        }
+                        super.prepareForTranslogOperations(totalTranslogOps, listener);
                     }
-                    super.prepareForTranslogOperations(totalTranslogOps, listener);
                 }
-            });
+            );
             fut.get();
             recoveryDone.countDown();
             thread.join();
diff --git a/server/src/test/java/org/opensearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/opensearch/index/replication/RecoveryDuringReplicationTests.java
index 17b5440ab5424..b891ac63378ac 100644
--- a/server/src/test/java/org/opensearch/index/replication/RecoveryDuringReplicationTests.java
+++ b/server/src/test/java/org/opensearch/index/replication/RecoveryDuringReplicationTests.java
@@ -72,6 +72,7 @@
 import org.opensearch.indices.recovery.RecoveryState;
 import org.opensearch.indices.recovery.RecoveryTarget;
 import org.opensearch.indices.replication.common.ReplicationListener;
+import org.opensearch.threadpool.ThreadPool;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -118,7 +119,8 @@ public void testIndexingDuringFileRecovery() throws Exception {
                     indexShard,
                     node,
                     recoveryListener,
-                    logger
+                    logger,
+                    threadPool
                 )
             );
 
@@ -482,7 +484,7 @@ protected EngineFactory getEngineFactory(ShardRouting routing) {
             AtomicBoolean recoveryDone = new AtomicBoolean(false);
             final Future<Void> recoveryFuture = shards.asyncRecoverReplica(newReplica, (indexShard, node) -> {
                 recoveryStart.countDown();
-                return new RecoveryTarget(indexShard, node, recoveryListener) {
+                return new RecoveryTarget(indexShard, node, recoveryListener, threadPool) {
                     @Override
                     public void finalizeRecovery(long globalCheckpoint, long trimAboveSeqNo, ActionListener<Void> listener) {
                         recoveryDone.set(true);
@@ -536,7 +538,7 @@ protected EngineFactory getEngineFactory(final ShardRouting routing) {
             final IndexShard replica = shards.addReplica();
             final Future<Void> recoveryFuture = shards.asyncRecoverReplica(
                 replica,
-                (indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener) {
+                (indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener, threadPool) {
                     @Override
                     public void indexTranslogOperations(
                         final List<Translog.Operation> operations,
@@ -812,9 +814,10 @@ public BlockingTarget(
             IndexShard shard,
             DiscoveryNode sourceNode,
             ReplicationListener listener,
-            Logger logger
+            Logger logger,
+            ThreadPool threadPool
         ) {
-            super(shard, sourceNode, listener);
+            super(shard, sourceNode, listener, threadPool);
             this.recoveryBlocked = recoveryBlocked;
             this.releaseRecovery = releaseRecovery;
             this.stageToBlock = stageToBlock;
diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java
index 46be10ce62840..537bfcf8f8a6b 100644
--- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java
+++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java
@@ -3208,7 +3208,7 @@ public void testTranslogRecoverySyncsTranslog() throws IOException {
 
         indexDoc(primary, "_doc", "0", "{\"foo\" : \"bar\"}");
         IndexShard replica = newShard(primary.shardId(), false, "n2", metadata, null);
-        recoverReplica(replica, primary, (shard, discoveryNode) -> new RecoveryTarget(shard, discoveryNode, recoveryListener) {
+        recoverReplica(replica, primary, (shard, discoveryNode) -> new RecoveryTarget(shard, discoveryNode, recoveryListener, threadPool) {
             @Override
             public void indexTranslogOperations(
                 final List<Translog.Operation> operations,
@@ -3340,7 +3340,7 @@ public void testShardActiveDuringPeerRecovery() throws IOException {
         replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode));
         // Shard is still inactive since we haven't started recovering yet
         assertFalse(replica.isActive());
-        recoverReplica(replica, primary, (shard, discoveryNode) -> new RecoveryTarget(shard, discoveryNode, recoveryListener) {
+        recoverReplica(replica, primary, (shard, discoveryNode) -> new RecoveryTarget(shard, discoveryNode, recoveryListener, threadPool) {
             @Override
             public void indexTranslogOperations(
                 final List<Translog.Operation> operations,
@@ -3397,7 +3397,7 @@ public void testRefreshListenersDuringPeerRecovery() throws IOException {
         DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
         replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode));
         assertListenerCalled.accept(replica);
-        recoverReplica(replica, primary, (shard, discoveryNode) -> new RecoveryTarget(shard, discoveryNode, recoveryListener) {
+        recoverReplica(replica, primary, (shard, discoveryNode) -> new RecoveryTarget(shard, discoveryNode, recoveryListener, threadPool) {
             // we're only checking that listeners are called when the engine is open, before there is no point
             @Override
             public void prepareForTranslogOperations(int totalTranslogOps, ActionListener<Void> listener) {
diff --git a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java
index 4f5cad70fd643..85864eebd6d0d 100644
--- a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java
+++ b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java
@@ -86,7 +86,7 @@ public void testStartSequenceForReplicaRecovery() throws Exception {
             );
             shards.addReplica(newReplicaShard);
             AtomicBoolean assertDone = new AtomicBoolean(false);
-            shards.recoverReplica(newReplicaShard, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener) {
+            shards.recoverReplica(newReplicaShard, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener, threadPool) {
                 @Override
                 public IndexShard indexShard() {
                     IndexShard idxShard = super.indexShard();
diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java
index 7caff3e5f5479..e93d266dcab4c 100644
--- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java
+++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java
@@ -256,7 +256,7 @@ public void onDone(ReplicationState state) {
                     public void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
                         assertEquals(ExceptionsHelper.unwrap(e, IOException.class).getMessage(), "Expected failure");
                     }
-                }),
+                }, threadPool),
                 true,
                 true,
                 replicatePrimaryFunction
diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java
index a83e737dc25c1..7ff4c3ecf5236 100644
--- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java
+++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java
@@ -219,8 +219,9 @@ private TranslogConfig getTranslogConfig(final Path path, final Settings setting
             new ByteSizeValue(8, ByteSizeUnit.KB),
             new ByteSizeValue(10 + randomInt(128 * 1024), ByteSizeUnit.BYTES)
         );
-
-        final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings);
+        // To simulate that the node is remote backed
+        Settings nodeSettings = Settings.builder().put("node.attr.remote_store.translog.repository", "my-repo-1").build();
+        final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings, nodeSettings);
         return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize, "");
     }
 
diff --git a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java
index 5e6398da6fa1b..0e16e81b1bb70 100644
--- a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java
+++ b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java
@@ -149,22 +149,26 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem
             newRouting = newRouting.moveToUnassigned(unassignedInfo)
                 .updateUnassigned(unassignedInfo, RecoverySource.EmptyStoreRecoverySource.INSTANCE);
             newRouting = ShardRoutingHelper.initialize(newRouting, nodeId);
+            final DiscoveryNode localNode = new DiscoveryNode(
+                "foo",
+                buildNewFakeTransportAddress(),
+                emptyMap(),
+                emptySet(),
+                Version.CURRENT
+            );
             IndexShard shard = index.createShard(
                 newRouting,
                 s -> {},
                 RetentionLeaseSyncer.EMPTY,
                 SegmentReplicationCheckpointPublisher.EMPTY,
+                null,
+                null,
+                localNode,
                 null
             );
             IndexShardTestCase.updateRoutingEntry(shard, newRouting);
             assertEquals(5, counter.get());
-            final DiscoveryNode localNode = new DiscoveryNode(
-                "foo",
-                buildNewFakeTransportAddress(),
-                emptyMap(),
-                emptySet(),
-                Version.CURRENT
-            );
+
             shard.markAsRecovering("store", new RecoveryState(newRouting, localNode, null));
             IndexShardTestCase.recoverFromStore(shard);
             newRouting = ShardRoutingHelper.moveToStarted(newRouting);
diff --git a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java
index 34f854cae56ba..1e6cc43703672 100644
--- a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java
+++ b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java
@@ -95,7 +95,7 @@ public void testWriteFileChunksConcurrently() throws Exception {
         final DiscoveryNode pNode = getFakeDiscoNode(sourceShard.routingEntry().currentNodeId());
         final DiscoveryNode rNode = getFakeDiscoNode(targetShard.routingEntry().currentNodeId());
         targetShard.markAsRecovering("test-peer-recovery", new RecoveryState(targetShard.routingEntry(), rNode, pNode));
-        final RecoveryTarget recoveryTarget = new RecoveryTarget(targetShard, null, null);
+        final RecoveryTarget recoveryTarget = new RecoveryTarget(targetShard, null, null, threadPool);
         final PlainActionFuture<Void> receiveFileInfoFuture = new PlainActionFuture<>();
         recoveryTarget.receiveFileInfo(
             mdFiles.stream().map(StoreFileMetadata::name).collect(Collectors.toList()),
@@ -355,7 +355,7 @@ public void testResetStartingSeqNoIfLastCommitCorrupted() throws Exception {
         shard.prepareForIndexRecovery();
         long startingSeqNo = shard.recoverLocallyAndFetchStartSeqNo(true);
         shard.store().markStoreCorrupted(new IOException("simulated"));
-        RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, null);
+        RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, null, threadPool);
         StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest(logger, rNode, recoveryTarget, startingSeqNo);
         assertThat(request.startingSeqNo(), equalTo(UNASSIGNED_SEQ_NO));
         assertThat(request.metadataSnapshot().size(), equalTo(0));
@@ -396,7 +396,7 @@ public void testResetStartRequestIfTranslogIsCorrupted() throws Exception {
         shard = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.PeerRecoverySource.INSTANCE));
         shard.markAsRecovering("peer recovery", new RecoveryState(shard.routingEntry(), pNode, rNode));
         shard.prepareForIndexRecovery();
-        RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, null);
+        RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, null, threadPool);
         StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest(
             logger,
             rNode,
diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java
index ad90255a3cc3f..71d89e2856c6e 100644
--- a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java
+++ b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java
@@ -137,7 +137,8 @@ public void testRetentionPolicyChangeDuringRecovery() throws Exception {
                     indexShard,
                     node,
                     recoveryListener,
-                    logger
+                    logger,
+                    threadPool
                 )
             );
             recoveryBlocked.await();
@@ -348,7 +349,7 @@ public void testPeerRecoverySendSafeCommitInFileBased() throws Exception {
         }
         IndexShard replicaShard = newShard(primaryShard.shardId(), false);
         updateMappings(replicaShard, primaryShard.indexSettings().getIndexMetadata());
-        recoverReplica(replicaShard, primaryShard, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener) {
+        recoverReplica(replicaShard, primaryShard, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener, threadPool) {
             @Override
             public void prepareForTranslogOperations(int totalTranslogOps, ActionListener<Void> listener) {
                 super.prepareForTranslogOperations(totalTranslogOps, listener);
@@ -480,7 +481,7 @@ public void onDone(ReplicationState state) {
                     public void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
                         assertThat(ExceptionsHelper.unwrap(e, IOException.class).getMessage(), equalTo("simulated"));
                     }
-                }))
+                }, threadPool))
             );
             expectThrows(AlreadyClosedException.class, () -> replica.refresh("test"));
             group.removeReplica(replica);
diff --git a/server/src/test/java/org/opensearch/recovery/ReplicationCollectionTests.java b/server/src/test/java/org/opensearch/recovery/ReplicationCollectionTests.java
index fb4dc97435512..4ce4e28690697 100644
--- a/server/src/test/java/org/opensearch/recovery/ReplicationCollectionTests.java
+++ b/server/src/test/java/org/opensearch/recovery/ReplicationCollectionTests.java
@@ -225,6 +225,6 @@ long startRecovery(
         final DiscoveryNode rNode = getDiscoveryNode(indexShard.routingEntry().currentNodeId());
         indexShard.markAsRecovering("remote", new RecoveryState(indexShard.routingEntry(), sourceNode, rNode));
         indexShard.prepareForIndexRecovery();
-        return collection.start(new RecoveryTarget(indexShard, sourceNode, listener), timeValue);
+        return collection.start(new RecoveryTarget(indexShard, sourceNode, listener, threadPool), timeValue);
     }
 }
diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java
index 9800782272ede..e6e20ce8f8566 100644
--- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java
+++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java
@@ -520,7 +520,7 @@ public synchronized boolean removeReplica(IndexShard replica) throws IOException
         }
 
         public void recoverReplica(IndexShard replica) throws IOException {
-            recoverReplica(replica, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener));
+            recoverReplica(replica, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener, threadPool));
         }
 
         public void recoverReplica(IndexShard replica, BiFunction<IndexShard, DiscoveryNode, RecoveryTarget> targetSupplier)
diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java
index bf1c4d4c94e04..a2f9eb677c0ac 100644
--- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java
+++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java
@@ -617,7 +617,14 @@ protected IndexShard newShard(
         @Nullable Path remotePath,
         IndexingOperationListener... listeners
     ) throws IOException {
-        final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build();
+        Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build();
+        // To simulate that the node is remote backed
+        if (indexMetadata.getSettings().get(IndexMetadata.SETTING_REMOTE_STORE_ENABLED) == "true") {
+            nodeSettings = Settings.builder()
+                .put("node.name", routing.currentNodeId())
+                .put("node.attr.remote_store.translog.repository", "seg_repo")
+                .build();
+        }
         final IndexSettings indexSettings = new IndexSettings(indexMetadata, nodeSettings);
         final IndexShard indexShard;
         if (storeProvider == null) {
@@ -646,7 +653,7 @@ protected IndexShard newShard(
             RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory = null;
             RepositoriesService mockRepoSvc = mock(RepositoriesService.class);
 
-            if (indexSettings.isRemoteStoreEnabled()) {
+            if (indexSettings.isRemoteStoreEnabled() || indexSettings.isRemoteNode()) {
                 String remoteStoreRepository = indexSettings.getRemoteStoreRepository();
                 // remote path via setting a repository . This is a hack used for shards are created using reset .
                 // since we can't get remote path from IndexShard directly, we are using repository to store it .
@@ -703,7 +710,8 @@ protected IndexShard newShard(
                 remoteStoreStatsTrackerFactory,
                 () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL,
                 "dummy-node",
-                DefaultRecoverySettings.INSTANCE
+                DefaultRecoverySettings.INSTANCE,
+                false
             );
             indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER);
             if (remoteStoreStatsTrackerFactory != null) {
@@ -1001,7 +1009,7 @@ public static void updateRoutingEntry(IndexShard shard, ShardRouting shardRoutin
     protected void recoveryEmptyReplica(IndexShard replica, boolean startReplica) throws IOException {
         IndexShard primary = null;
         try {
-            primary = newStartedShard(true);
+            primary = newStartedShard(true, replica.indexSettings.getSettings());
             recoverReplica(replica, primary, startReplica);
         } finally {
             closeShards(primary);
@@ -1033,7 +1041,7 @@ protected void recoverReplica(
         recoverReplica(
             replica,
             primary,
-            (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener),
+            (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener, threadPool),
             true,
             startReplica,
             replicatePrimaryFunction
@@ -1051,7 +1059,7 @@ protected void recoverReplica(
     }
 
     public Function<List<IndexShard>, List<SegmentReplicationTarget>> getReplicationFunc(final IndexShard target) {
-        return target.indexSettings().isSegRepEnabled() ? (shardList) -> {
+        return target.indexSettings().isSegRepEnabledOrRemoteNode() ? (shardList) -> {
             try {
                 assert shardList.size() >= 2;
                 final IndexShard primary = shardList.get(0);
@@ -1489,7 +1497,7 @@ private SegmentReplicationTargetService prepareForReplication(
 
         SegmentReplicationSourceFactory sourceFactory = null;
         SegmentReplicationTargetService targetService;
-        if (primaryShard.indexSettings.isRemoteStoreEnabled()) {
+        if (primaryShard.indexSettings.isRemoteStoreEnabled() || primaryShard.indexSettings.isRemoteNode()) {
             RecoverySettings recoverySettings = new RecoverySettings(
                 Settings.EMPTY,
                 new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)