From 08908289b7beea0236f21b54d33e018f3168f9a2 Mon Sep 17 00:00:00 2001
From: Kunal Kotwani <kkotwani@amazon.com>
Date: Mon, 10 Jul 2023 09:43:19 -0700
Subject: [PATCH] Add safeguard limits for file cache during node level
 allocation (#8208)

Signed-off-by: Kunal Kotwani <kkotwani@amazon.com>
Signed-off-by: Shivansh Arora <hishiv@amazon.com>
---
 CHANGELOG.md                                  |   1 +
 .../cluster/ClusterInfoServiceIT.java         |  28 +++
 .../org/opensearch/cluster/ClusterInfo.java   |  24 +-
 .../cluster/InternalClusterInfoService.java   |  15 +-
 .../decider/DiskThresholdDecider.java         |  52 +++++
 .../store/remote/filecache/FileCache.java     |   3 +
 .../opensearch/cluster/ClusterInfoTests.java  |  24 +-
 .../allocation/DiskThresholdMonitorTests.java |   2 +-
 ...dexShardConstraintDeciderOverlapTests.java |   2 +-
 .../RemoteShardsBalancerBaseTestCase.java     |   2 +-
 .../decider/DiskThresholdDeciderTests.java    | 205 +++++++++++++++++-
 .../DiskThresholdDeciderUnitTests.java        |  13 +-
 .../MockInternalClusterInfoService.java       |   3 +-
 .../opensearch/test/OpenSearchTestCase.java   |   8 +
 14 files changed, 367 insertions(+), 15 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 929431bba24d5..ecb92a4051738 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -158,6 +158,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
 - [Search Pipelines] Pass pipeline creation context to processor factories ([#8164](https://github.com/opensearch-project/OpenSearch/pull/8164))
 - Enabling compression levels for zstd and zstd_no_dict ([#8312](https://github.com/opensearch-project/OpenSearch/pull/8312))
 - Optimize Metadata build() to skip redundant computations as part of ClusterState build ([#7853](https://github.com/opensearch-project/OpenSearch/pull/7853))
+- Add safeguard limits for file cache during node level allocation ([#8208](https://github.com/opensearch-project/OpenSearch/pull/8208))
 
 ### Deprecated
 
diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/ClusterInfoServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/ClusterInfoServiceIT.java
index 17e8526acfd74..508b8e21e42c1 100644
--- a/server/src/internalClusterTest/java/org/opensearch/cluster/ClusterInfoServiceIT.java
+++ b/server/src/internalClusterTest/java/org/opensearch/cluster/ClusterInfoServiceIT.java
@@ -50,6 +50,7 @@
 import org.opensearch.core.common.Strings;
 import org.opensearch.index.IndexService;
 import org.opensearch.index.shard.IndexShard;
+import org.opensearch.index.store.remote.filecache.FileCacheStats;
 import org.opensearch.index.store.Store;
 import org.opensearch.indices.IndicesService;
 import org.opensearch.indices.SystemIndexDescriptor;
@@ -192,6 +193,11 @@ public void testClusterInfoServiceCollectsInformation() {
             logger.info("--> shard size: {}", size);
             assertThat("shard size is greater than 0", size, greaterThanOrEqualTo(0L));
         }
+
+        final Map<String, FileCacheStats> nodeFileCacheStats = info.nodeFileCacheStats;
+        assertNotNull(nodeFileCacheStats);
+        assertThat("file cache is empty on non search nodes", nodeFileCacheStats.size(), Matchers.equalTo(0));
+
         ClusterService clusterService = internalTestCluster.getInstance(ClusterService.class, internalTestCluster.getClusterManagerName());
         ClusterState state = clusterService.state();
         for (ShardRouting shard : state.routingTable().allShards()) {
@@ -209,6 +215,28 @@ public void testClusterInfoServiceCollectsInformation() {
         }
     }
 
+    public void testClusterInfoServiceCollectsFileCacheInformation() {
+        internalCluster().startNodes(1);
+        internalCluster().ensureAtLeastNumSearchAndDataNodes(2);
+
+        InternalTestCluster internalTestCluster = internalCluster();
+        // Get the cluster info service on the cluster-manager node
+        final InternalClusterInfoService infoService = (InternalClusterInfoService) internalTestCluster.getInstance(
+            ClusterInfoService.class,
+            internalTestCluster.getClusterManagerName()
+        );
+        infoService.setUpdateFrequency(TimeValue.timeValueMillis(200));
+        ClusterInfo info = infoService.refresh();
+        assertNotNull("info should not be null", info);
+        final Map<String, FileCacheStats> nodeFileCacheStats = info.nodeFileCacheStats;
+        assertNotNull(nodeFileCacheStats);
+        assertThat("file cache is enabled on both search nodes", nodeFileCacheStats.size(), Matchers.equalTo(2));
+
+        for (FileCacheStats fileCacheStats : nodeFileCacheStats.values()) {
+            assertThat("file cache is non empty", fileCacheStats.getTotal().getBytes(), greaterThan(0L));
+        }
+    }
+
     public void testClusterInfoServiceInformationClearOnError() {
         internalCluster().startNodes(
             2,
diff --git a/server/src/main/java/org/opensearch/cluster/ClusterInfo.java b/server/src/main/java/org/opensearch/cluster/ClusterInfo.java
index 876a36c205975..ffa3d0d19fb71 100644
--- a/server/src/main/java/org/opensearch/cluster/ClusterInfo.java
+++ b/server/src/main/java/org/opensearch/cluster/ClusterInfo.java
@@ -34,6 +34,7 @@
 
 import com.carrotsearch.hppc.ObjectHashSet;
 import com.carrotsearch.hppc.cursors.ObjectCursor;
+import org.opensearch.Version;
 import org.opensearch.cluster.routing.ShardRouting;
 import org.opensearch.common.io.stream.StreamInput;
 import org.opensearch.common.io.stream.StreamOutput;
@@ -42,6 +43,7 @@
 import org.opensearch.core.xcontent.ToXContentFragment;
 import org.opensearch.core.xcontent.XContentBuilder;
 import org.opensearch.index.shard.ShardId;
+import org.opensearch.index.store.remote.filecache.FileCacheStats;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -63,9 +65,10 @@ public class ClusterInfo implements ToXContentFragment, Writeable {
     public static final ClusterInfo EMPTY = new ClusterInfo();
     final Map<ShardRouting, String> routingToDataPath;
     final Map<NodeAndPath, ReservedSpace> reservedSpace;
+    final Map<String, FileCacheStats> nodeFileCacheStats;
 
     protected ClusterInfo() {
-        this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
+        this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
     }
 
     /**
@@ -83,13 +86,15 @@ public ClusterInfo(
         final Map<String, DiskUsage> mostAvailableSpaceUsage,
         final Map<String, Long> shardSizes,
         final Map<ShardRouting, String> routingToDataPath,
-        final Map<NodeAndPath, ReservedSpace> reservedSpace
+        final Map<NodeAndPath, ReservedSpace> reservedSpace,
+        final Map<String, FileCacheStats> nodeFileCacheStats
     ) {
         this.leastAvailableSpaceUsage = leastAvailableSpaceUsage;
         this.shardSizes = shardSizes;
         this.mostAvailableSpaceUsage = mostAvailableSpaceUsage;
         this.routingToDataPath = routingToDataPath;
         this.reservedSpace = reservedSpace;
+        this.nodeFileCacheStats = nodeFileCacheStats;
     }
 
     public ClusterInfo(StreamInput in) throws IOException {
@@ -105,6 +110,11 @@ public ClusterInfo(StreamInput in) throws IOException {
         this.shardSizes = Collections.unmodifiableMap(sizeMap);
         this.routingToDataPath = Collections.unmodifiableMap(routingMap);
         this.reservedSpace = Collections.unmodifiableMap(reservedSpaceMap);
+        if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
+            this.nodeFileCacheStats = in.readMap(StreamInput::readString, FileCacheStats::new);
+        } else {
+            this.nodeFileCacheStats = Map.of();
+        }
     }
 
     @Override
@@ -114,6 +124,9 @@ public void writeTo(StreamOutput out) throws IOException {
         out.writeMap(this.shardSizes, StreamOutput::writeString, (o, v) -> out.writeLong(v == null ? -1 : v));
         out.writeMap(this.routingToDataPath, (o, k) -> k.writeTo(o), StreamOutput::writeString);
         out.writeMap(this.reservedSpace, (o, v) -> v.writeTo(o), (o, v) -> v.writeTo(o));
+        if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
+            out.writeMap(this.nodeFileCacheStats, StreamOutput::writeString, (o, v) -> v.writeTo(o));
+        }
     }
 
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
@@ -187,6 +200,13 @@ public Map<String, DiskUsage> getNodeMostAvailableDiskUsages() {
         return Collections.unmodifiableMap(this.mostAvailableSpaceUsage);
     }
 
+    /**
+     * Returns a node id to file cache stats mapping for the nodes that have search roles assigned to it.
+     */
+    public Map<String, FileCacheStats> getNodeFileCacheStats() {
+        return Collections.unmodifiableMap(this.nodeFileCacheStats);
+    }
+
     /**
      * Returns the shard size for the given shard routing or <code>null</code> it that metric is not available.
      */
diff --git a/server/src/main/java/org/opensearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/opensearch/cluster/InternalClusterInfoService.java
index 0acc7bece439f..9c12d6bb3e7ea 100644
--- a/server/src/main/java/org/opensearch/cluster/InternalClusterInfoService.java
+++ b/server/src/main/java/org/opensearch/cluster/InternalClusterInfoService.java
@@ -59,6 +59,7 @@
 import org.opensearch.common.util.concurrent.AbstractRunnable;
 import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
 import org.opensearch.index.store.StoreStats;
+import org.opensearch.index.store.remote.filecache.FileCacheStats;
 import org.opensearch.monitor.fs.FsInfo;
 import org.opensearch.threadpool.ThreadPool;
 import org.opensearch.transport.ReceiveTimeoutTransportException;
@@ -72,6 +73,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 /**
  * InternalClusterInfoService provides the ClusterInfoService interface,
@@ -110,6 +112,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
 
     private volatile Map<String, DiskUsage> leastAvailableSpaceUsages;
     private volatile Map<String, DiskUsage> mostAvailableSpaceUsages;
+    private volatile Map<String, FileCacheStats> nodeFileCacheStats;
     private volatile IndicesStatsSummary indicesStatsSummary;
     // null if this node is not currently the cluster-manager
     private final AtomicReference<RefreshAndRescheduleRunnable> refreshAndRescheduleRunnable = new AtomicReference<>();
@@ -122,6 +125,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
     public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, Client client) {
         this.leastAvailableSpaceUsages = Map.of();
         this.mostAvailableSpaceUsages = Map.of();
+        this.nodeFileCacheStats = Map.of();
         this.indicesStatsSummary = IndicesStatsSummary.EMPTY;
         this.threadPool = threadPool;
         this.client = client;
@@ -208,7 +212,8 @@ public ClusterInfo getClusterInfo() {
             mostAvailableSpaceUsages,
             indicesStatsSummary.shardSizes,
             indicesStatsSummary.shardRoutingToDataPath,
-            indicesStatsSummary.reservedSpace
+            indicesStatsSummary.reservedSpace,
+            nodeFileCacheStats
         );
     }
 
@@ -221,6 +226,7 @@ protected CountDownLatch updateNodeStats(final ActionListener<NodesStatsResponse
         final NodesStatsRequest nodesStatsRequest = new NodesStatsRequest("data:true");
         nodesStatsRequest.clear();
         nodesStatsRequest.addMetric(NodesStatsRequest.Metric.FS.metricName());
+        nodesStatsRequest.addMetric(NodesStatsRequest.Metric.FILE_CACHE_STATS.metricName());
         nodesStatsRequest.timeout(fetchTimeout);
         client.admin().cluster().nodesStats(nodesStatsRequest, new LatchedActionListener<>(listener, latch));
         return latch;
@@ -264,6 +270,13 @@ public void onResponse(NodesStatsResponse nodesStatsResponse) {
                 );
                 leastAvailableSpaceUsages = Collections.unmodifiableMap(leastAvailableUsagesBuilder);
                 mostAvailableSpaceUsages = Collections.unmodifiableMap(mostAvailableUsagesBuilder);
+
+                nodeFileCacheStats = Collections.unmodifiableMap(
+                    nodesStatsResponse.getNodes()
+                        .stream()
+                        .filter(nodeStats -> nodeStats.getNode().isSearchNode())
+                        .collect(Collectors.toMap(nodeStats -> nodeStats.getNode().getId(), NodeStats::getFileCacheStats))
+                );
             }
 
             @Override
diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDecider.java
index ddd5e9274f08b..e216ca4511bff 100644
--- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDecider.java
+++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDecider.java
@@ -54,14 +54,21 @@
 import org.opensearch.common.unit.ByteSizeValue;
 import org.opensearch.index.Index;
 import org.opensearch.index.shard.ShardId;
+import org.opensearch.index.store.remote.filecache.FileCacheStats;
 import org.opensearch.snapshots.SnapshotShardSizeInfo;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
+import static org.opensearch.cluster.routing.RoutingPool.REMOTE_CAPABLE;
+import static org.opensearch.cluster.routing.RoutingPool.getNodePool;
+import static org.opensearch.cluster.routing.RoutingPool.getShardPool;
 import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING;
 import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING;
+import static org.opensearch.index.store.remote.filecache.FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO;
 
 /**
  * The {@link DiskThresholdDecider} checks that the node a shard is potentially
@@ -167,6 +174,42 @@ public static long sizeOfRelocatingShards(
     @Override
     public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
         ClusterInfo clusterInfo = allocation.clusterInfo();
+
+        /*
+         The following block enables allocation for remote shards within safeguard limits of the filecache.
+         */
+        if (REMOTE_CAPABLE.equals(getNodePool(node)) && REMOTE_CAPABLE.equals(getShardPool(shardRouting, allocation))) {
+            final List<ShardRouting> remoteShardsOnNode = StreamSupport.stream(node.spliterator(), false)
+                .filter(shard -> shard.primary() && REMOTE_CAPABLE.equals(getShardPool(shard, allocation)))
+                .collect(Collectors.toList());
+            final long currentNodeRemoteShardSize = remoteShardsOnNode.stream()
+                .map(ShardRouting::getExpectedShardSize)
+                .mapToLong(Long::longValue)
+                .sum();
+
+            final long shardSize = getExpectedShardSize(
+                shardRouting,
+                0L,
+                allocation.clusterInfo(),
+                allocation.snapshotShardSizeInfo(),
+                allocation.metadata(),
+                allocation.routingTable()
+            );
+
+            final FileCacheStats fileCacheStats = clusterInfo.getNodeFileCacheStats().getOrDefault(node.nodeId(), null);
+            final long nodeCacheSize = fileCacheStats != null ? fileCacheStats.getTotal().getBytes() : 0;
+            final long totalNodeRemoteShardSize = currentNodeRemoteShardSize + shardSize;
+
+            if (totalNodeRemoteShardSize > DATA_TO_FILE_CACHE_SIZE_RATIO * nodeCacheSize) {
+                return allocation.decision(
+                    Decision.NO,
+                    NAME,
+                    "file cache limit reached - remote shard size will exceed configured safeguard ratio"
+                );
+            }
+            return Decision.YES;
+        }
+
         Map<String, DiskUsage> usages = clusterInfo.getNodeMostAvailableDiskUsages();
         final Decision decision = earlyTerminate(allocation, usages);
         if (decision != null) {
@@ -422,6 +465,15 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
         if (shardRouting.currentNodeId().equals(node.nodeId()) == false) {
             throw new IllegalArgumentException("Shard [" + shardRouting + "] is not allocated on node: [" + node.nodeId() + "]");
         }
+
+        /*
+         The following block prevents movement for remote shards since they do not use the local storage as
+         the primary source of data storage.
+         */
+        if (REMOTE_CAPABLE.equals(getNodePool(node)) && REMOTE_CAPABLE.equals(getShardPool(shardRouting, allocation))) {
+            return Decision.ALWAYS;
+        }
+
         final ClusterInfo clusterInfo = allocation.clusterInfo();
         final Map<String, DiskUsage> usages = clusterInfo.getNodeLeastAvailableDiskUsages();
         final Decision decision = earlyTerminate(allocation, usages);
diff --git a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java
index 0aa3740fb6ecb..3d23b4d22538c 100644
--- a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java
+++ b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java
@@ -49,6 +49,9 @@ public class FileCache implements RefCountedCache<Path, CachedIndexInput> {
 
     private final CircuitBreaker circuitBreaker;
 
+    // TODO: Convert the constant into an integer setting
+    public static final int DATA_TO_FILE_CACHE_SIZE_RATIO = 5;
+
     public FileCache(SegmentedCache<Path, CachedIndexInput> cache, CircuitBreaker circuitBreaker) {
         this.theCache = cache;
         this.circuitBreaker = circuitBreaker;
diff --git a/server/src/test/java/org/opensearch/cluster/ClusterInfoTests.java b/server/src/test/java/org/opensearch/cluster/ClusterInfoTests.java
index a32d6e35d0182..e1294da1e57bc 100644
--- a/server/src/test/java/org/opensearch/cluster/ClusterInfoTests.java
+++ b/server/src/test/java/org/opensearch/cluster/ClusterInfoTests.java
@@ -36,6 +36,7 @@
 import org.opensearch.cluster.routing.TestShardRouting;
 import org.opensearch.common.io.stream.BytesStreamOutput;
 import org.opensearch.index.shard.ShardId;
+import org.opensearch.index.store.remote.filecache.FileCacheStats;
 import org.opensearch.test.OpenSearchTestCase;
 
 import java.util.HashMap;
@@ -49,7 +50,8 @@ public void testSerialization() throws Exception {
             randomDiskUsage(),
             randomShardSizes(),
             randomRoutingToDataPath(),
-            randomReservedSpace()
+            randomReservedSpace(),
+            randomFileCacheStats()
         );
         BytesStreamOutput output = new BytesStreamOutput();
         clusterInfo.writeTo(output);
@@ -60,6 +62,7 @@ public void testSerialization() throws Exception {
         assertEquals(clusterInfo.shardSizes, result.shardSizes);
         assertEquals(clusterInfo.routingToDataPath, result.routingToDataPath);
         assertEquals(clusterInfo.reservedSpace, result.reservedSpace);
+        assertEquals(clusterInfo.getNodeFileCacheStats().size(), result.getNodeFileCacheStats().size());
     }
 
     private static Map<String, DiskUsage> randomDiskUsage() {
@@ -79,6 +82,25 @@ private static Map<String, DiskUsage> randomDiskUsage() {
         return builder;
     }
 
+    private static Map<String, FileCacheStats> randomFileCacheStats() {
+        int numEntries = randomIntBetween(0, 16);
+        final Map<String, FileCacheStats> builder = new HashMap<>(numEntries);
+        for (int i = 0; i < numEntries; i++) {
+            String key = randomAlphaOfLength(16);
+            FileCacheStats fileCacheStats = new FileCacheStats(
+                randomLong(),
+                randomLong(),
+                randomLong(),
+                randomLong(),
+                randomLong(),
+                randomLong(),
+                randomLong()
+            );
+            builder.put(key, fileCacheStats);
+        }
+        return builder;
+    }
+
     private static Map<String, Long> randomShardSizes() {
         int numEntries = randomIntBetween(0, 128);
         final Map<String, Long> builder = new HashMap<>(numEntries);
diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java
index 21d891bdbc317..3e21f6c19e150 100644
--- a/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java
+++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java
@@ -798,7 +798,7 @@ private static ClusterInfo clusterInfo(
         final Map<String, DiskUsage> diskUsages,
         final Map<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> reservedSpace
     ) {
-        return new ClusterInfo(diskUsages, null, null, null, reservedSpace);
+        return new ClusterInfo(diskUsages, null, null, null, reservedSpace, Map.of());
     }
 
 }
diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/IndexShardConstraintDeciderOverlapTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/IndexShardConstraintDeciderOverlapTests.java
index 7112af6b4efc0..15dcae65ce6e7 100644
--- a/server/src/test/java/org/opensearch/cluster/routing/allocation/IndexShardConstraintDeciderOverlapTests.java
+++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/IndexShardConstraintDeciderOverlapTests.java
@@ -176,7 +176,7 @@ public DevNullClusterInfo(
             final Map<String, Long> shardSizes,
             final Map<NodeAndPath, ReservedSpace> reservedSpace
         ) {
-            super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, reservedSpace);
+            super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, reservedSpace, Map.of());
         }
 
         @Override
diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java
index 9d7d0ebc5b2b1..dbb08a999877d 100644
--- a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java
+++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java
@@ -239,7 +239,7 @@ public DevNullClusterInfo(
             final Map<String, DiskUsage> mostAvailableSpaceUsage,
             final Map<String, Long> shardSizes
         ) {
-            super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, Map.of());
+            super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, Map.of(), Map.of());
         }
 
         @Override
diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java
index c23d98c95fc3c..4ccf0a9bc3a20 100644
--- a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java
+++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java
@@ -70,6 +70,7 @@
 import org.opensearch.common.settings.Settings;
 import org.opensearch.index.Index;
 import org.opensearch.index.shard.ShardId;
+import org.opensearch.index.store.remote.filecache.FileCacheStats;
 import org.opensearch.repositories.IndexId;
 import org.opensearch.snapshots.EmptySnapshotsInfoService;
 import org.opensearch.snapshots.InternalSnapshotsInfoService.SnapshotShard;
@@ -83,6 +84,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static java.util.Collections.emptyMap;
@@ -283,6 +285,190 @@ public void testDiskThreshold() {
         assertThat(clusterState.getRoutingNodes().node("node4").size(), equalTo(1));
     }
 
+    public void testDiskThresholdForRemoteShards() {
+        Settings diskSettings = Settings.builder()
+            .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true)
+            .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), 0.7)
+            .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), 0.8)
+            .build();
+
+        Map<String, DiskUsage> usages = new HashMap<>();
+        usages.put("node1", new DiskUsage("node1", "node1", "/dev/null", 100, 10)); // 90% used
+        usages.put("node2", new DiskUsage("node2", "node2", "/dev/null", 100, 35)); // 65% used
+        usages.put("node3", new DiskUsage("node3", "node3", "/dev/null", 100, 60)); // 40% used
+
+        Map<String, Long> shardSizes = new HashMap<>();
+        shardSizes.put("[test][0][p]", 10L); // 10 bytes
+        shardSizes.put("[test][0][r]", 10L);
+
+        Map<String, FileCacheStats> fileCacheStatsMap = new HashMap<>();
+        fileCacheStatsMap.put("node1", new FileCacheStats(0, 0, 1000, 0, 0, 0, 0));
+        fileCacheStatsMap.put("node2", new FileCacheStats(0, 0, 1000, 0, 0, 0, 0));
+        fileCacheStatsMap.put("node3", new FileCacheStats(0, 0, 1000, 0, 0, 0, 0));
+        final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes, fileCacheStatsMap);
+
+        ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
+        AllocationDeciders deciders = new AllocationDeciders(
+            new HashSet<>(Arrays.asList(new SameShardAllocationDecider(Settings.EMPTY, clusterSettings), makeDecider(diskSettings)))
+        );
+
+        ClusterInfoService cis = () -> {
+            logger.info("--> calling fake getClusterInfo");
+            return clusterInfo;
+        };
+        AllocationService strategy = new AllocationService(
+            deciders,
+            new TestGatewayAllocator(),
+            new BalancedShardsAllocator(Settings.EMPTY),
+            cis,
+            EmptySnapshotsInfoService.INSTANCE
+        );
+
+        Metadata metadata = Metadata.builder()
+            .put(IndexMetadata.builder("test").settings(remoteIndexSettings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
+            .build();
+
+        final RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build();
+
+        ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
+            .metadata(metadata)
+            .routingTable(initialRoutingTable)
+            .build();
+
+        Set<DiscoveryNodeRole> defaultWithSearchRole = new HashSet<>(CLUSTER_MANAGER_DATA_ROLES);
+        defaultWithSearchRole.add(DiscoveryNodeRole.SEARCH_ROLE);
+
+        logger.info("--> adding two nodes");
+        clusterState = ClusterState.builder(clusterState)
+            .nodes(DiscoveryNodes.builder().add(newNode("node1", defaultWithSearchRole)).add(newNode("node2", defaultWithSearchRole)))
+            .build();
+        clusterState = strategy.reroute(clusterState, "reroute");
+        logShardStates(clusterState);
+
+        // Primary shard should be initializing, replica should not
+        assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(2));
+
+        logger.info("--> start the shards (primaries)");
+        clusterState = startInitializingShardsAndReroute(strategy, clusterState);
+
+        logShardStates(clusterState);
+        // Assert that we're able to start the primary
+        assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(2));
+
+        logger.info("--> adding node3");
+
+        clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node3"))).build();
+        clusterState = strategy.reroute(clusterState, "reroute");
+
+        logShardStates(clusterState);
+        // Assert that the replica is initialized now that node3 is available with enough space
+        assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(2));
+        assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(0));
+
+        logger.info("--> start the shards (replicas)");
+        clusterState = startInitializingShardsAndReroute(strategy, clusterState);
+
+        logShardStates(clusterState);
+        // Assert that the replica couldn't be started since node1 doesn't have enough space
+        assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(2));
+        assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
+        assertThat(clusterState.getRoutingNodes().node("node2").size(), equalTo(1));
+        assertThat(clusterState.getRoutingNodes().node("node3").size(), equalTo(0));
+    }
+
+    public void testFileCacheRemoteShardsDecisions() {
+        Settings diskSettings = Settings.builder()
+            .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true)
+            .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "60%")
+            .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "70%")
+            .build();
+
+        // We have an index with 2 primary shards each taking 40 bytes. Each node has 100 bytes available
+        final Map<String, DiskUsage> usages = new HashMap<>();
+        usages.put("node1", new DiskUsage("node1", "n1", "/dev/null", 100, 20)); // 80% used
+        usages.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 100)); // 0% used
+
+        final Map<String, Long> shardSizes = new HashMap<>();
+        shardSizes.put("[test][0][p]", 40L);
+        shardSizes.put("[test][1][p]", 40L);
+        shardSizes.put("[foo][0][p]", 10L);
+
+        // First node has filecache size as 0, second has 1000, greater than the shard sizes.
+        Map<String, FileCacheStats> fileCacheStatsMap = new HashMap<>();
+        fileCacheStatsMap.put("node1", new FileCacheStats(0, 0, 0, 0, 0, 0, 0));
+        fileCacheStatsMap.put("node2", new FileCacheStats(0, 0, 1000, 0, 0, 0, 0));
+
+        final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes, fileCacheStatsMap);
+
+        Set<DiscoveryNodeRole> defaultWithSearchRole = new HashSet<>(CLUSTER_MANAGER_DATA_ROLES);
+        defaultWithSearchRole.add(DiscoveryNodeRole.SEARCH_ROLE);
+
+        DiskThresholdDecider diskThresholdDecider = makeDecider(diskSettings);
+        Metadata metadata = Metadata.builder()
+            .put(IndexMetadata.builder("test").settings(remoteIndexSettings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0))
+            .build();
+
+        RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build();
+
+        DiscoveryNode discoveryNode1 = new DiscoveryNode(
+            "node1",
+            buildNewFakeTransportAddress(),
+            emptyMap(),
+            defaultWithSearchRole,
+            Version.CURRENT
+        );
+        DiscoveryNode discoveryNode2 = new DiscoveryNode(
+            "node2",
+            buildNewFakeTransportAddress(),
+            emptyMap(),
+            defaultWithSearchRole,
+            Version.CURRENT
+        );
+        DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(discoveryNode1).add(discoveryNode2).build();
+
+        ClusterState baseClusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
+            .metadata(metadata)
+            .routingTable(initialRoutingTable)
+            .nodes(discoveryNodes)
+            .build();
+
+        // Two shards consuming each 80% of disk space while 70% is allowed, so shard 0 isn't allowed here
+        ShardRouting firstRouting = TestShardRouting.newShardRouting("test", 0, "node1", null, true, ShardRoutingState.STARTED);
+        ShardRouting secondRouting = TestShardRouting.newShardRouting("test", 1, "node1", null, true, ShardRoutingState.STARTED);
+        RoutingNode firstRoutingNode = new RoutingNode("node1", discoveryNode1, firstRouting, secondRouting);
+        RoutingNode secondRoutingNode = new RoutingNode("node2", discoveryNode2);
+
+        RoutingTable.Builder builder = RoutingTable.builder()
+            .add(
+                IndexRoutingTable.builder(firstRouting.index())
+                    .addIndexShard(new IndexShardRoutingTable.Builder(firstRouting.shardId()).addShard(firstRouting).build())
+                    .addIndexShard(new IndexShardRoutingTable.Builder(secondRouting.shardId()).addShard(secondRouting).build())
+            );
+        ClusterState clusterState = ClusterState.builder(baseClusterState).routingTable(builder.build()).build();
+        RoutingAllocation routingAllocation = new RoutingAllocation(
+            null,
+            new RoutingNodes(clusterState),
+            clusterState,
+            clusterInfo,
+            null,
+            System.nanoTime()
+        );
+        routingAllocation.debugDecision(true);
+        Decision decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
+        assertThat(decision.type(), equalTo(Decision.Type.YES));
+
+        decision = diskThresholdDecider.canAllocate(firstRouting, firstRoutingNode, routingAllocation);
+        assertThat(decision.type(), equalTo(Decision.Type.NO));
+
+        assertThat(
+            ((Decision.Single) decision).getExplanation(),
+            containsString("file cache limit reached - remote shard size will exceed configured safeguard ratio")
+        );
+
+        decision = diskThresholdDecider.canAllocate(firstRouting, secondRoutingNode, routingAllocation);
+        assertThat(decision.type(), equalTo(Decision.Type.YES));
+    }
+
     public void testDiskThresholdWithAbsoluteSizes() {
         Settings diskSettings = Settings.builder()
             .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true)
@@ -863,7 +1049,8 @@ public void testShardRelocationsTakenIntoAccount() {
                     Map.of(
                         new ClusterInfo.NodeAndPath("node1", "/dev/null"),
                         new ClusterInfo.ReservedSpace.Builder().add(new ShardId("", "", 0), between(51, 200)).build()
-                    )
+                    ),
+                    Map.of()
                 )
             );
             clusterState = applyStartedShardsUntilNoChange(clusterState, strategy);
@@ -1455,16 +1642,26 @@ static class DevNullClusterInfo extends ClusterInfo {
             final Map<String, DiskUsage> mostAvailableSpaceUsage,
             final Map<String, Long> shardSizes
         ) {
-            this(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, Map.of());
+            this(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, Map.of(), Map.of());
+        }
+
+        DevNullClusterInfo(
+            final Map<String, DiskUsage> leastAvailableSpaceUsage,
+            final Map<String, DiskUsage> mostAvailableSpaceUsage,
+            final Map<String, Long> shardSizes,
+            final Map<String, FileCacheStats> nodeFileCacheStats
+        ) {
+            this(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, Map.of(), nodeFileCacheStats);
         }
 
         DevNullClusterInfo(
             final Map<String, DiskUsage> leastAvailableSpaceUsage,
             final Map<String, DiskUsage> mostAvailableSpaceUsage,
             final Map<String, Long> shardSizes,
-            Map<NodeAndPath, ReservedSpace> reservedSpace
+            Map<NodeAndPath, ReservedSpace> reservedSpace,
+            final Map<String, FileCacheStats> nodeFileCacheStats
         ) {
-            super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, reservedSpace);
+            super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, reservedSpace, nodeFileCacheStats);
         }
 
         @Override
diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java
index caab381e65e84..62c52e93aad33 100644
--- a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java
+++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java
@@ -127,7 +127,7 @@ public void testCanAllocateUsesMaxAvailableSpace() {
 
         final Map<String, Long> shardSizes = new HashMap<>();
         shardSizes.put("[test][0][p]", 10L); // 10 bytes
-        final ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages, mostAvailableUsage, shardSizes, Map.of(), Map.of());
+        final ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages, mostAvailableUsage, shardSizes, Map.of(), Map.of(), Map.of());
         RoutingAllocation allocation = new RoutingAllocation(
             new AllocationDeciders(Collections.singleton(decider)),
             clusterState.getRoutingNodes(),
@@ -203,7 +203,7 @@ public void testCannotAllocateDueToLackOfDiskResources() {
         // way bigger than available space
         final long shardSize = randomIntBetween(110, 1000);
         shardSizes.put("[test][0][p]", shardSize);
-        ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages, mostAvailableUsage, shardSizes, Map.of(), Map.of());
+        ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages, mostAvailableUsage, shardSizes, Map.of(), Map.of(), Map.of());
         RoutingAllocation allocation = new RoutingAllocation(
             new AllocationDeciders(Collections.singleton(decider)),
             clusterState.getRoutingNodes(),
@@ -320,7 +320,14 @@ public void testCanRemainUsesLeastAvailableSpace() {
         shardSizes.put("[test][1][p]", 10L);
         shardSizes.put("[test][2][p]", 10L);
 
-        final ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages, mostAvailableUsage, shardSizes, shardRoutingMap, Map.of());
+        final ClusterInfo clusterInfo = new ClusterInfo(
+            leastAvailableUsages,
+            mostAvailableUsage,
+            shardSizes,
+            shardRoutingMap,
+            Map.of(),
+            Map.of()
+        );
         RoutingAllocation allocation = new RoutingAllocation(
             new AllocationDeciders(Collections.singleton(decider)),
             clusterState.getRoutingNodes(),
diff --git a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java
index 6634d1b4dbafc..6354cf18e8b62 100644
--- a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java
+++ b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java
@@ -132,7 +132,8 @@ class SizeFakingClusterInfo extends ClusterInfo {
                 delegate.getNodeMostAvailableDiskUsages(),
                 delegate.shardSizes,
                 delegate.routingToDataPath,
-                delegate.reservedSpace
+                delegate.reservedSpace,
+                delegate.nodeFileCacheStats
             );
         }
 
diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java
index 7722b59313b5f..ec397a2baa640 100644
--- a/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java
+++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java
@@ -109,6 +109,7 @@
 import org.opensearch.env.NodeEnvironment;
 import org.opensearch.env.TestEnvironment;
 import org.opensearch.index.Index;
+import org.opensearch.index.IndexModule;
 import org.opensearch.index.IndexSettings;
 import org.opensearch.index.analysis.AnalysisRegistry;
 import org.opensearch.index.analysis.AnalyzerScope;
@@ -1200,6 +1201,13 @@ public static Settings.Builder settings(Version version) {
         return builder;
     }
 
+    public static Settings.Builder remoteIndexSettings(Version version) {
+        Settings.Builder builder = Settings.builder()
+            .put(IndexMetadata.SETTING_VERSION_CREATED, version)
+            .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey());
+        return builder;
+    }
+
     /**
      * Returns size random values
      */