Skip to content

Commit

Permalink
Add safeguard limits for file cache during node level allocation
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <kkotwani@amazon.com>
  • Loading branch information
kotwanikunal committed Jun 21, 2023
1 parent 9c5c6eb commit 840d282
Show file tree
Hide file tree
Showing 16 changed files with 389 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.core.common.Strings;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.index.store.Store;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.SystemIndexDescriptor;
Expand Down Expand Up @@ -192,6 +193,11 @@ public void testClusterInfoServiceCollectsInformation() {
logger.info("--> shard size: {}", size);
assertThat("shard size is greater than 0", size, greaterThanOrEqualTo(0L));
}

final Map<String, FileCacheStats> nodeFileCacheStats = info.nodeFileCacheStats;
assertNotNull(nodeFileCacheStats);
assertThat("file cache is empty on non search nodes", nodeFileCacheStats.size(), Matchers.equalTo(0));

ClusterService clusterService = internalTestCluster.getInstance(ClusterService.class, internalTestCluster.getClusterManagerName());
ClusterState state = clusterService.state();
for (ShardRouting shard : state.routingTable().allShards()) {
Expand All @@ -209,6 +215,28 @@ public void testClusterInfoServiceCollectsInformation() {
}
}

public void testClusterInfoServiceCollectsFileCacheInformation() {
internalCluster().startNodes(1);
internalCluster().ensureAtLeastNumSearchAndDataNodes(2);

InternalTestCluster internalTestCluster = internalCluster();
// Get the cluster info service on the cluster-manager node
final InternalClusterInfoService infoService = (InternalClusterInfoService) internalTestCluster.getInstance(
ClusterInfoService.class,
internalTestCluster.getClusterManagerName()
);
infoService.setUpdateFrequency(TimeValue.timeValueMillis(200));
ClusterInfo info = infoService.refresh();
assertNotNull("info should not be null", info);
final Map<String, FileCacheStats> nodeFileCacheStats = info.nodeFileCacheStats;
assertNotNull(nodeFileCacheStats);
assertThat("file cache is enabled on both search nodes", nodeFileCacheStats.size(), Matchers.equalTo(2));

for (FileCacheStats fileCacheStats : nodeFileCacheStats.values()) {
assertThat("file cache is non empty", fileCacheStats.getTotal().getBytes(), greaterThan(0L));
}
}

public void testClusterInfoServiceInformationClearOnError() {
internalCluster().startNodes(
2,
Expand Down
24 changes: 22 additions & 2 deletions server/src/main/java/org/opensearch/cluster/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.opensearch.Version;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
Expand All @@ -42,6 +43,7 @@
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.remote.filecache.FileCacheStats;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -63,9 +65,10 @@ public class ClusterInfo implements ToXContentFragment, Writeable {
public static final ClusterInfo EMPTY = new ClusterInfo();
final Map<ShardRouting, String> routingToDataPath;
final Map<NodeAndPath, ReservedSpace> reservedSpace;
final Map<String, FileCacheStats> nodeFileCacheStats;

protected ClusterInfo() {
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
}

/**
Expand All @@ -83,13 +86,15 @@ public ClusterInfo(
final Map<String, DiskUsage> mostAvailableSpaceUsage,
final Map<String, Long> shardSizes,
final Map<ShardRouting, String> routingToDataPath,
final Map<NodeAndPath, ReservedSpace> reservedSpace
final Map<NodeAndPath, ReservedSpace> reservedSpace,
final Map<String, FileCacheStats> nodeFileCacheStats
) {
this.leastAvailableSpaceUsage = leastAvailableSpaceUsage;
this.shardSizes = shardSizes;
this.mostAvailableSpaceUsage = mostAvailableSpaceUsage;
this.routingToDataPath = routingToDataPath;
this.reservedSpace = reservedSpace;
this.nodeFileCacheStats = nodeFileCacheStats;
}

public ClusterInfo(StreamInput in) throws IOException {
Expand All @@ -105,6 +110,11 @@ public ClusterInfo(StreamInput in) throws IOException {
this.shardSizes = Collections.unmodifiableMap(sizeMap);
this.routingToDataPath = Collections.unmodifiableMap(routingMap);
this.reservedSpace = Collections.unmodifiableMap(reservedSpaceMap);
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
this.nodeFileCacheStats = in.readMap(StreamInput::readString, FileCacheStats::new);
} else {
this.nodeFileCacheStats = Map.of();
}
}

@Override
Expand All @@ -114,6 +124,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeMap(this.shardSizes, StreamOutput::writeString, (o, v) -> out.writeLong(v == null ? -1 : v));
out.writeMap(this.routingToDataPath, (o, k) -> k.writeTo(o), StreamOutput::writeString);
out.writeMap(this.reservedSpace, (o, v) -> v.writeTo(o), (o, v) -> v.writeTo(o));
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeMap(this.nodeFileCacheStats, StreamOutput::writeString, (o, v) -> v.writeTo(o));
}
}

public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
Expand Down Expand Up @@ -187,6 +200,13 @@ public Map<String, DiskUsage> getNodeMostAvailableDiskUsages() {
return Collections.unmodifiableMap(this.mostAvailableSpaceUsage);
}

/**
* Returns a node id to file cache stats mapping for the nodes that have search roles assigned to it.
*/
public Map<String, FileCacheStats> getNodeFileCacheStats() {
return Collections.unmodifiableMap(this.nodeFileCacheStats);
}

/**
* Returns the shard size for the given shard routing or <code>null</code> it that metric is not available.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.index.store.StoreStats;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ReceiveTimeoutTransportException;
Expand All @@ -72,6 +73,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/**
* InternalClusterInfoService provides the ClusterInfoService interface,
Expand Down Expand Up @@ -110,6 +112,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt

private volatile Map<String, DiskUsage> leastAvailableSpaceUsages;
private volatile Map<String, DiskUsage> mostAvailableSpaceUsages;
private volatile Map<String, FileCacheStats> nodeFileCacheStats;
private volatile IndicesStatsSummary indicesStatsSummary;
// null if this node is not currently the cluster-manager
private final AtomicReference<RefreshAndRescheduleRunnable> refreshAndRescheduleRunnable = new AtomicReference<>();
Expand All @@ -122,6 +125,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, Client client) {
this.leastAvailableSpaceUsages = Map.of();
this.mostAvailableSpaceUsages = Map.of();
this.nodeFileCacheStats = Map.of();
this.indicesStatsSummary = IndicesStatsSummary.EMPTY;
this.threadPool = threadPool;
this.client = client;
Expand Down Expand Up @@ -208,7 +212,8 @@ public ClusterInfo getClusterInfo() {
mostAvailableSpaceUsages,
indicesStatsSummary.shardSizes,
indicesStatsSummary.shardRoutingToDataPath,
indicesStatsSummary.reservedSpace
indicesStatsSummary.reservedSpace,
nodeFileCacheStats
);
}

Expand All @@ -221,6 +226,7 @@ protected CountDownLatch updateNodeStats(final ActionListener<NodesStatsResponse
final NodesStatsRequest nodesStatsRequest = new NodesStatsRequest("data:true");
nodesStatsRequest.clear();
nodesStatsRequest.addMetric(NodesStatsRequest.Metric.FS.metricName());
nodesStatsRequest.addMetric(NodesStatsRequest.Metric.FILE_CACHE_STATS.metricName());
nodesStatsRequest.timeout(fetchTimeout);
client.admin().cluster().nodesStats(nodesStatsRequest, new LatchedActionListener<>(listener, latch));
return latch;
Expand Down Expand Up @@ -264,6 +270,13 @@ public void onResponse(NodesStatsResponse nodesStatsResponse) {
);
leastAvailableSpaceUsages = Collections.unmodifiableMap(leastAvailableUsagesBuilder);
mostAvailableSpaceUsages = Collections.unmodifiableMap(mostAvailableUsagesBuilder);

nodeFileCacheStats = Collections.unmodifiableMap(
nodesStatsResponse.getNodes()
.stream()
.filter(nodeStats -> nodeStats.getNode().isSearchNode())
.collect(Collectors.toMap(nodeStats -> nodeStats.getNode().getId(), NodeStats::getFileCacheStats))
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -329,6 +330,21 @@ public List<ShardRouting> shardsWithState(ShardRoutingState... states) {
return shards;
}

/**
* Determine the shards that satisfy the predicate
* @param predicate shard routing predicate which needs to be met
* @return List of shards
*/
public List<ShardRouting> shardsSatisfyingPredicate(Predicate<ShardRouting> predicate) {
List<ShardRouting> shards = new ArrayList<>();
for (ShardRouting shardEntry : this) {
if (predicate.test(shardEntry)) {
shards.add(shardEntry);
}
}
return shards;
}

/**
* Determine the shards of an index with a specific state
* @param index id of the index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,8 @@ public static RoutingPool getIndexPool(IndexMetadata indexMetadata) {
}
return LOCAL_ONLY;
}

public static boolean isShardAndNodePoolRemote(ShardRouting shardRouting, RoutingAllocation allocation, RoutingNode node) {
return REMOTE_CAPABLE.equals(getNodePool(node)) && REMOTE_CAPABLE.equals(getShardPool(shardRouting, allocation));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingPool;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
Expand All @@ -54,14 +55,17 @@
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.index.Index;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.snapshots.SnapshotShardSizeInfo;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;

import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING;
import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING;
import static org.opensearch.index.store.remote.filecache.FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO;

/**
* The {@link DiskThresholdDecider} checks that the node a shard is potentially
Expand Down Expand Up @@ -167,6 +171,42 @@ public static long sizeOfRelocatingShards(
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
ClusterInfo clusterInfo = allocation.clusterInfo();

/*
The following block enables allocation for remote shards within safeguard limits of the filecache.
*/
if (RoutingPool.isShardAndNodePoolRemote(shardRouting, allocation, node)) {
final Predicate<ShardRouting> shardRoutingPredicate = shard -> shard.primary()
&& RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation));
final List<ShardRouting> remoteShardsOnNode = node.shardsSatisfyingPredicate(shardRoutingPredicate);
final long currentNodeRemoteShardSize = remoteShardsOnNode.stream()
.map(ShardRouting::getExpectedShardSize)
.mapToLong(Long::longValue)
.sum();

final long shardSize = getExpectedShardSize(
shardRouting,
0L,
allocation.clusterInfo(),
allocation.snapshotShardSizeInfo(),
allocation.metadata(),
allocation.routingTable()
);

final FileCacheStats fileCacheStats = clusterInfo.getNodeFileCacheStats().getOrDefault(node.nodeId(), null);
final long nodeCacheSize = fileCacheStats != null ? fileCacheStats.getTotal().getBytes() : 0;
final long totalNodeRemoteShardSize = currentNodeRemoteShardSize + shardSize;

if (totalNodeRemoteShardSize > DATA_TO_FILE_CACHE_SIZE_RATIO * nodeCacheSize) {
return allocation.decision(
Decision.NO,
NAME,
"file cache limit reached - remote shard size will exceed configured safeguard ratio"
);
}
return Decision.YES;
}

Map<String, DiskUsage> usages = clusterInfo.getNodeMostAvailableDiskUsages();
final Decision decision = earlyTerminate(allocation, usages);
if (decision != null) {
Expand Down Expand Up @@ -422,6 +462,15 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
if (shardRouting.currentNodeId().equals(node.nodeId()) == false) {
throw new IllegalArgumentException("Shard [" + shardRouting + "] is not allocated on node: [" + node.nodeId() + "]");
}

/*
The following block prevents movement for remote shards since they do not use the local storage as
the primary source of data storage.
*/
if (RoutingPool.isShardAndNodePoolRemote(shardRouting, allocation, node)) {
return Decision.ALWAYS;
}

final ClusterInfo clusterInfo = allocation.clusterInfo();
final Map<String, DiskUsage> usages = clusterInfo.getNodeLeastAvailableDiskUsages();
final Decision decision = earlyTerminate(allocation, usages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public class FileCache implements RefCountedCache<Path, CachedIndexInput> {

private final CircuitBreaker circuitBreaker;

// TODO: Convert the constant into an integer setting
public static final int DATA_TO_FILE_CACHE_SIZE_RATIO = 5;

public FileCache(SegmentedCache<Path, CachedIndexInput> cache, CircuitBreaker circuitBreaker) {
this.theCache = cache;
this.circuitBreaker = circuitBreaker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.cluster.routing.TestShardRouting;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.test.OpenSearchTestCase;

import java.util.HashMap;
Expand All @@ -49,7 +50,8 @@ public void testSerialization() throws Exception {
randomDiskUsage(),
randomShardSizes(),
randomRoutingToDataPath(),
randomReservedSpace()
randomReservedSpace(),
randomFileCacheStats()
);
BytesStreamOutput output = new BytesStreamOutput();
clusterInfo.writeTo(output);
Expand All @@ -60,6 +62,7 @@ public void testSerialization() throws Exception {
assertEquals(clusterInfo.shardSizes, result.shardSizes);
assertEquals(clusterInfo.routingToDataPath, result.routingToDataPath);
assertEquals(clusterInfo.reservedSpace, result.reservedSpace);
assertEquals(clusterInfo.getNodeFileCacheStats().size(), result.getNodeFileCacheStats().size());
}

private static Map<String, DiskUsage> randomDiskUsage() {
Expand All @@ -79,6 +82,25 @@ private static Map<String, DiskUsage> randomDiskUsage() {
return builder;
}

private static Map<String, FileCacheStats> randomFileCacheStats() {
int numEntries = randomIntBetween(0, 16);
final Map<String, FileCacheStats> builder = new HashMap<>(numEntries);
for (int i = 0; i < numEntries; i++) {
String key = randomAlphaOfLength(16);
FileCacheStats fileCacheStats = new FileCacheStats(
randomLong(),
randomLong(),
randomLong(),
randomLong(),
randomLong(),
randomLong(),
randomLong()
);
builder.put(key, fileCacheStats);
}
return builder;
}

private static Map<String, Long> randomShardSizes() {
int numEntries = randomIntBetween(0, 128);
final Map<String, Long> builder = new HashMap<>(numEntries);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ public void testShardsWithState() {
assertThat(routingNode.shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
}

public void testShardsWithPredicate() {
assertThat(routingNode.shardsSatisfyingPredicate(shard -> true).size(), equalTo(3));
assertThat(routingNode.shardsSatisfyingPredicate(shard -> false).size(), equalTo(0));
assertThat(routingNode.shardsSatisfyingPredicate(ShardRouting::primary).size(), equalTo(0));
}

public void testShardsWithStateInIndex() {
assertThat(routingNode.shardsWithState("test", ShardRoutingState.INITIALIZING, ShardRoutingState.STARTED).size(), equalTo(2));
assertThat(routingNode.shardsWithState("test", ShardRoutingState.STARTED).size(), equalTo(1));
Expand Down
Loading

0 comments on commit 840d282

Please sign in to comment.