Skip to content

Commit

Permalink
[Bugfix] Fix cache maximum size settings not working properly with pl…
Browse files Browse the repository at this point in the history
…uggable caching (opensearch-project#16636)

* Fix cache size setting

Signed-off-by: Peter Alfonsi <petealft@amazon.com>

* Changelog

Signed-off-by: Peter Alfonsi <petealft@amazon.com>

* Deprecate original IRC size setting

Signed-off-by: Peter Alfonsi <petealft@amazon.com>

* spotlessApply

Signed-off-by: Peter Alfonsi <petealft@amazon.com>

* Addressed Ankit's comments

Signed-off-by: Peter Alfonsi <petealft@amazon.com>

* Address Sagar's comment

Signed-off-by: Peter Alfonsi <petealft@amazon.com>

---------

Signed-off-by: Peter Alfonsi <petealft@amazon.com>
Signed-off-by: Peter Alfonsi <peter.alfonsi@gmail.com>
Signed-off-by: Ankit Jain <akjain@amazon.com>
Co-authored-by: Peter Alfonsi <petealft@amazon.com>
Co-authored-by: Ankit Jain <akjain@amazon.com>
  • Loading branch information
3 people authored Jan 16, 2025
1 parent 13159c1 commit a436076
Show file tree
Hide file tree
Showing 15 changed files with 403 additions and 62 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix case insensitive and escaped query on wildcard ([#16827](https://github.com/opensearch-project/OpenSearch/pull/16827))
- Fix _list/shards API failing when closed indices are present ([#16606](https://github.com/opensearch-project/OpenSearch/pull/16606))
- Fix remote shards balance ([#15335](https://github.com/opensearch-project/OpenSearch/pull/15335))
- Fix max request cache size settings not working properly with pluggable caching ([#16636](https://github.com/opensearch-project/OpenSearch/pull/16636))
- Always use `constant_score` query for `match_only_text` field ([#16964](https://github.com/opensearch-project/OpenSearch/pull/16964))
- Fix Shallow copy snapshot failures on closed index ([#16868](https://github.com/opensearch-project/OpenSearch/pull/16868))
- Fix multi-value sort for unsigned long ([#16732](https://github.com/opensearch-project/OpenSearch/pull/16732))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ static class TieredSpilloverCacheSegment<K, V> implements ICache<K, V> {

private final TieredSpilloverCacheStatsHolder statsHolder;

private final long onHeapCacheMaxWeight;
private final long diskCacheMaxWeight;

/**
* This map is used to handle concurrent requests for same key in computeIfAbsent() to ensure we load the value
* only once.
Expand Down Expand Up @@ -218,6 +221,8 @@ static class TieredSpilloverCacheSegment<K, V> implements ICache<K, V> {
cacheListMap.put(diskCache, new TierInfo(isDiskCacheEnabled, TIER_DIMENSION_VALUE_DISK));
this.caches = Collections.synchronizedMap(cacheListMap);
this.policies = builder.policies; // Will never be null; builder initializes it to an empty list
this.onHeapCacheMaxWeight = onHeapCacheSizeInBytes;
this.diskCacheMaxWeight = diskCacheSizeInBytes;
}

// Package private for testing
Expand Down Expand Up @@ -526,6 +531,16 @@ void updateStatsOnPut(String destinationTierValue, ICacheKey<K> key, V value) {
statsHolder.incrementSizeInBytes(dimensionValues, weigher.applyAsLong(key, value));
}

// pkg-private for testing
long getOnHeapCacheMaxWeight() {
return onHeapCacheMaxWeight;
}

// pkg-private for testing
long getDiskCacheMaxWeight() {
return diskCacheMaxWeight;
}

/**
* A class which receives removal events from the heap tier.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ public class TieredSpilloverCacheSettings {

/**
* Setting which defines the onHeap cache size to be used within tiered cache.
* This setting overrides size settings from the heap tier implementation.
* For example, if OpenSearchOnHeapCache is the heap tier in the request cache, and
* indices.requests.cache.opensearch_onheap.size is set, that value will be ignored in favor of this setting.
*
* Pattern: {cache_type}.tiered_spillover.onheap.store.size
* Example: indices.request.cache.tiered_spillover.onheap.store.size
Expand All @@ -96,6 +99,9 @@ public class TieredSpilloverCacheSettings {

/**
* Setting which defines the disk cache size to be used within tiered cache.
* This setting overrides the size setting from the disk tier implementation.
* For example, if EhcacheDiskCache is the disk tier in the request cache, and
* indices.requests.cache.ehcache_disk.max_size_in_bytes is set, that value will be ignored in favor of this setting.
*/
public static final Setting.AffixSetting<Long> TIERED_SPILLOVER_DISK_STORE_SIZE = Setting.suffixKeySetting(
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".disk.store.size",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ public void close() {

}

long getMaximumWeight() {
return maxSize;
}

public static class MockDiskCacheFactory implements Factory {

public static final String NAME = "mockDiskCache";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@

import static org.opensearch.cache.common.tier.TieredSpilloverCache.ZERO_SEGMENT_COUNT_EXCEPTION_MESSAGE;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.DISK_CACHE_ENABLED_SETTING_MAP;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.MIN_DISK_CACHE_SIZE_IN_BYTES;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_SIZE;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TIERED_SPILLOVER_SEGMENTS;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;
Expand Down Expand Up @@ -2166,6 +2167,134 @@ public void testDropStatsForDimensions() throws Exception {
assertEquals(new ImmutableCacheStats(0, 0, 0, 0, 0), tieredSpilloverCache.stats().getTotalStats());
}

public void testSegmentSizesWhenUsingFactory() {
// The TSC's tier size settings, TIERED_SPILLOVER_ONHEAP_STORE_SIZE and TIERED_SPILLOVER_DISK_STORE_SIZE,
// should always be respected, overriding the individual implementation's size settings if present
long expectedHeapSize = 256L * between(10, 20);
long expectedDiskSize = MIN_DISK_CACHE_SIZE_IN_BYTES + 256L * between(30, 40);
long heapSizeFromImplSetting = 50;
int diskSizeFromImplSetting = 50;
int numSegments = getNumberOfSegments();

int keyValueSize = 1;
MockCacheRemovalListener<String, String> removalListener = new MockCacheRemovalListener<>();
Settings settings = Settings.builder()
.put(
CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
MockDiskCache.MockDiskCacheFactory.NAME
)
// These two size settings should be honored
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_SIZE.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
expectedHeapSize + "b"
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_SIZE.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
expectedDiskSize
)
// The size setting from the OpenSearchOnHeap implementation should not be honored
.put(
OpenSearchOnHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
heapSizeFromImplSetting + "b"
)
.put(FeatureFlags.PLUGGABLE_CACHE, "true")
.put(
TIERED_SPILLOVER_SEGMENTS.getConcreteSettingForNamespace(CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()).getKey(),
numSegments
)
.build();
String storagePath = getStoragePath(settings);

TieredSpilloverCache<String, String> tieredSpilloverCache = (TieredSpilloverCache<
String,
String>) new TieredSpilloverCache.TieredSpilloverCacheFactory().create(
new CacheConfig.Builder<String, String>().setKeyType(String.class)
.setKeyType(String.class)
.setWeigher((k, v) -> keyValueSize)
.setRemovalListener(removalListener)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setSettings(settings)
.setDimensionNames(dimensionNames)
.setCachedResultParser(s -> new CachedQueryResult.PolicyValues(20_000_000L)) // Values will always appear to have taken
// 20_000_000 ns = 20 ms to compute
.setClusterSettings(clusterSettings)
.setStoragePath(storagePath)
.build(),
CacheType.INDICES_REQUEST_CACHE,
Map.of(
OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME,
new OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory(),
MockDiskCache.MockDiskCacheFactory.NAME,
// The size value passed in here acts as the "implementation setting" for the disk tier, and should also be ignored
new MockDiskCache.MockDiskCacheFactory(0, diskSizeFromImplSetting, false, keyValueSize)
)
);
checkSegmentSizes(tieredSpilloverCache, expectedHeapSize, expectedDiskSize);
}

public void testSegmentSizesWhenNotUsingFactory() {
long expectedHeapSize = 256L * between(10, 20);
long expectedDiskSize = MIN_DISK_CACHE_SIZE_IN_BYTES + 256L * between(30, 40);
int heapSizeFromImplSetting = 50;
int diskSizeFromImplSetting = 50;

Settings settings = Settings.builder()
.put(
CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
)
.put(FeatureFlags.PLUGGABLE_CACHE, "true")
// The size setting from the OpenSearchOnHeapCache implementation should not be honored
.put(
OpenSearchOnHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
heapSizeFromImplSetting + "b"
)
.build();

int keyValueSize = 1;
MockCacheRemovalListener<String, String> removalListener = new MockCacheRemovalListener<>();
int numSegments = getNumberOfSegments();
CacheConfig<String, String> cacheConfig = getCacheConfig(1, settings, removalListener, numSegments);
TieredSpilloverCache<String, String> tieredSpilloverCache = getTieredSpilloverCache(
new OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory(),
new MockDiskCache.MockDiskCacheFactory(0, diskSizeFromImplSetting, true, keyValueSize),
cacheConfig,
null,
removalListener,
numSegments,
expectedHeapSize,
expectedDiskSize
);
checkSegmentSizes(tieredSpilloverCache, expectedHeapSize, expectedDiskSize);
}

private void checkSegmentSizes(TieredSpilloverCache<String, String> cache, long expectedHeapSize, long expectedDiskSize) {
TieredSpilloverCache.TieredSpilloverCacheSegment<String, String> segment = cache.tieredSpilloverCacheSegments[0];
assertEquals(expectedHeapSize / cache.getNumberOfSegments(), segment.getOnHeapCacheMaxWeight());
assertEquals(expectedDiskSize / cache.getNumberOfSegments(), segment.getDiskCacheMaxWeight());
}

private List<String> getMockDimensions() {
List<String> dims = new ArrayList<>();
for (String dimensionName : dimensionNames) {
Expand Down Expand Up @@ -2455,9 +2584,9 @@ private void verifyComputeIfAbsentThrowsException(
MockCacheRemovalListener<String, String> removalListener = new MockCacheRemovalListener<>();
Settings settings = Settings.builder()
.put(
OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE)
.get(MAXIMUM_SIZE_IN_BYTES_KEY)
.getKey(),
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_SIZE.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
onHeapCacheSize * keyValueSize + "b"
)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public class EhcacheDiskCacheSettings {

/**
* Disk cache max size setting.
* If this cache is used as a tier in a TieredSpilloverCache, this setting is ignored.
*/
public static final Setting.AffixSetting<Long> DISK_CACHE_MAX_SIZE_IN_BYTES_SETTING = Setting.suffixKeySetting(
EhcacheDiskCache.EhcacheDiskCacheFactory.EHCACHE_DISK_CACHE_NAME + ".max_size_in_bytes",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,11 @@ private V deserializeValue(ByteArrayWrapper binary) {
return valueSerializer.deserialize(binary.value);
}

// Pkg-private for testing.
long getMaximumWeight() {
return maxWeightInBytes;
}

/**
* Factory to create an ehcache disk cache.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.serializer.BytesReferenceSerializer;
import org.opensearch.common.cache.serializer.Serializer;
import org.opensearch.common.cache.settings.CacheSettings;
import org.opensearch.common.cache.stats.ImmutableCacheStats;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
Expand Down Expand Up @@ -1201,6 +1203,65 @@ public void testEhcacheCloseWithDestroyCacheMethodThrowingException() throws Exc
ehcacheDiskCache.close();
}

public void testWithCacheConfigSizeSettings() throws Exception {
// The cache should get its size from the config if present, and otherwise should get it from the setting.
long maxSizeFromSetting = between(MINIMUM_MAX_SIZE_IN_BYTES + 1000, MINIMUM_MAX_SIZE_IN_BYTES + 2000);
long maxSizeFromConfig = between(MINIMUM_MAX_SIZE_IN_BYTES + 3000, MINIMUM_MAX_SIZE_IN_BYTES + 4000);

EhcacheDiskCache<String, String> cache = setupMaxSizeTest(maxSizeFromSetting, maxSizeFromConfig, false);
assertEquals(maxSizeFromSetting, cache.getMaximumWeight());

cache = setupMaxSizeTest(maxSizeFromSetting, maxSizeFromConfig, true);
assertEquals(maxSizeFromConfig, cache.getMaximumWeight());
}

// Modified from OpenSearchOnHeapCacheTests. Can't reuse, as we can't add a dependency on the server.test module.
private EhcacheDiskCache<String, String> setupMaxSizeTest(long maxSizeFromSetting, long maxSizeFromConfig, boolean putSizeInConfig)
throws Exception {
MockRemovalListener<String, String> listener = new MockRemovalListener<>();
try (NodeEnvironment env = newNodeEnvironment(Settings.builder().build())) {
Settings settings = Settings.builder()
.put(FeatureFlags.PLUGGABLE_CACHE, true)
.put(
CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
EhcacheDiskCache.EhcacheDiskCacheFactory.EHCACHE_DISK_CACHE_NAME
)
.put(
EhcacheDiskCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE)
.get(DISK_MAX_SIZE_IN_BYTES_KEY)
.getKey(),
maxSizeFromSetting
)
.put(
EhcacheDiskCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE)
.get(DISK_STORAGE_PATH_KEY)
.getKey(),
env.nodePaths()[0].indicesPath.toString() + "/request_cache/" + 0
)
.build();

CacheConfig.Builder<String, String> cacheConfigBuilder = new CacheConfig.Builder<String, String>().setKeyType(String.class)
.setValueType(String.class)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setWeigher(getWeigher())
.setRemovalListener(listener)
.setSettings(settings)
.setDimensionNames(List.of(dimensionName))
.setStatsTrackingEnabled(true);
if (putSizeInConfig) {
cacheConfigBuilder.setMaxSizeInBytes(maxSizeFromConfig);
}

ICache.Factory cacheFactory = new EhcacheDiskCache.EhcacheDiskCacheFactory();
return (EhcacheDiskCache<String, String>) cacheFactory.create(
cacheConfigBuilder.build(),
CacheType.INDICES_REQUEST_CACHE,
null
);
}
}

static class MockEhcahceDiskCache extends EhcacheDiskCache<String, String> {

public MockEhcahceDiskCache(Builder<String, String> builder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,8 @@ public CacheService(Map<String, ICache.Factory> cacheStoreTypeFactories, Setting
}

public <K, V> ICache<K, V> createCache(CacheConfig<K, V> config, CacheType cacheType) {
Setting<String> cacheSettingForCacheType = CacheSettings.CACHE_TYPE_STORE_NAME.getConcreteSettingForNamespace(
cacheType.getSettingPrefix()
);
String storeName = cacheSettingForCacheType.get(settings);
if (!FeatureFlags.PLUGGABLE_CACHE_SETTING.get(settings) || (storeName == null || storeName.isBlank())) {
String storeName = getStoreNameFromSetting(cacheType, settings);
if (!pluggableCachingEnabled(cacheType, settings)) {
// Condition 1: In case feature flag is off, we default to onHeap.
// Condition 2: In case storeName is not explicitly mentioned, we assume user is looking to use older
// settings, so we again fallback to onHeap to maintain backward compatibility.
Expand All @@ -74,4 +71,19 @@ public NodeCacheStats stats(CommonStatsFlags flags) {
}
return new NodeCacheStats(statsMap, flags);
}

/**
* Check if pluggable caching is on, and if a store type is present for this cache type.
*/
public static boolean pluggableCachingEnabled(CacheType cacheType, Settings settings) {
String storeName = getStoreNameFromSetting(cacheType, settings);
return FeatureFlags.PLUGGABLE_CACHE_SETTING.get(settings) && storeName != null && !storeName.isBlank();
}

private static String getStoreNameFromSetting(CacheType cacheType, Settings settings) {
Setting<String> cacheSettingForCacheType = CacheSettings.CACHE_TYPE_STORE_NAME.getConcreteSettingForNamespace(
cacheType.getSettingPrefix()
);
return cacheSettingForCacheType.get(settings);
}
}
Loading

0 comments on commit a436076

Please sign in to comment.