Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bugfix] Fix cache maximum size settings not working properly with pluggable caching #16636

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Skip remote-repositories validations for node-joins when RepositoriesService is not in sync with cluster-state ([#16763](https://github.com/opensearch-project/OpenSearch/pull/16763))
- Fix _list/shards API failing when closed indices are present ([#16606](https://github.com/opensearch-project/OpenSearch/pull/16606))
- Fix remote shards balance ([#15335](https://github.com/opensearch-project/OpenSearch/pull/15335))
- Fix max request cache size settings not working properly with pluggable caching ([#16636](https://github.com/opensearch-project/OpenSearch/pull/16636))
- Always use `constant_score` query for `match_only_text` field ([#16964](https://github.com/opensearch-project/OpenSearch/pull/16964))

### Security
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ static class TieredSpilloverCacheSegment<K, V> implements ICache<K, V> {

private final TieredSpilloverCacheStatsHolder statsHolder;

private final long onHeapCacheMaxWeight;
private final long diskCacheMaxWeight;

/**
* This map is used to handle concurrent requests for same key in computeIfAbsent() to ensure we load the value
* only once.
Expand Down Expand Up @@ -218,6 +221,8 @@ static class TieredSpilloverCacheSegment<K, V> implements ICache<K, V> {
cacheListMap.put(diskCache, new TierInfo(isDiskCacheEnabled, TIER_DIMENSION_VALUE_DISK));
this.caches = Collections.synchronizedMap(cacheListMap);
this.policies = builder.policies; // Will never be null; builder initializes it to an empty list
this.onHeapCacheMaxWeight = onHeapCacheSizeInBytes;
this.diskCacheMaxWeight = diskCacheSizeInBytes;
}

// Package private for testing
Expand Down Expand Up @@ -526,6 +531,16 @@ void updateStatsOnPut(String destinationTierValue, ICacheKey<K> key, V value) {
statsHolder.incrementSizeInBytes(dimensionValues, weigher.applyAsLong(key, value));
}

// pkg-private for testing
long getOnHeapCacheMaxWeight() {
return onHeapCacheMaxWeight;
}

// pkg-private for testing
long getDiskCacheMaxWeight() {
return diskCacheMaxWeight;
}

/**
* A class which receives removal events from the heap tier.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ public class TieredSpilloverCacheSettings {

/**
* Setting which defines the onHeap cache size to be used within tiered cache.
* This setting overrides size settings from the heap tier implementation.
* For example, if OpenSearchOnHeapCache is the heap tier in the request cache, and
* indices.requests.cache.opensearch_onheap.size is set, that value will be ignored in favor of this setting.
*
* Pattern: {cache_type}.tiered_spillover.onheap.store.size
* Example: indices.request.cache.tiered_spillover.onheap.store.size
Expand All @@ -96,6 +99,9 @@ public class TieredSpilloverCacheSettings {

/**
* Setting which defines the disk cache size to be used within tiered cache.
* This setting overrides the size setting from the disk tier implementation.
* For example, if EhcacheDiskCache is the disk tier in the request cache, and
* indices.requests.cache.ehcache_disk.max_size_in_bytes is set, that value will be ignored in favor of this setting.
*/
public static final Setting.AffixSetting<Long> TIERED_SPILLOVER_DISK_STORE_SIZE = Setting.suffixKeySetting(
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".disk.store.size",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ public void close() {

}

long getMaximumWeight() {
return maxSize;
}

public static class MockDiskCacheFactory implements Factory {

public static final String NAME = "mockDiskCache";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@

import static org.opensearch.cache.common.tier.TieredSpilloverCache.ZERO_SEGMENT_COUNT_EXCEPTION_MESSAGE;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.DISK_CACHE_ENABLED_SETTING_MAP;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.MIN_DISK_CACHE_SIZE_IN_BYTES;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_SIZE;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TIERED_SPILLOVER_SEGMENTS;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;
Expand Down Expand Up @@ -2166,6 +2167,134 @@ public void testDropStatsForDimensions() throws Exception {
assertEquals(new ImmutableCacheStats(0, 0, 0, 0, 0), tieredSpilloverCache.stats().getTotalStats());
}

public void testSegmentSizesWhenUsingFactory() {
// The TSC's tier size settings, TIERED_SPILLOVER_ONHEAP_STORE_SIZE and TIERED_SPILLOVER_DISK_STORE_SIZE,
// should always be respected, overriding the individual implementation's size settings if present
long expectedHeapSize = 256L * between(10, 20);
long expectedDiskSize = MIN_DISK_CACHE_SIZE_IN_BYTES + 256L * between(30, 40);
long heapSizeFromImplSetting = 50;
int diskSizeFromImplSetting = 50;
int numSegments = getNumberOfSegments();

int keyValueSize = 1;
MockCacheRemovalListener<String, String> removalListener = new MockCacheRemovalListener<>();
Settings settings = Settings.builder()
.put(
CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
MockDiskCache.MockDiskCacheFactory.NAME
)
// These two size settings should be honored
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_SIZE.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
expectedHeapSize + "b"
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_SIZE.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
expectedDiskSize
)
// The size setting from the OpenSearchOnHeap implementation should not be honored
.put(
OpenSearchOnHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
heapSizeFromImplSetting + "b"
)
.put(FeatureFlags.PLUGGABLE_CACHE, "true")
.put(
TIERED_SPILLOVER_SEGMENTS.getConcreteSettingForNamespace(CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()).getKey(),
numSegments
)
.build();
String storagePath = getStoragePath(settings);

TieredSpilloverCache<String, String> tieredSpilloverCache = (TieredSpilloverCache<
String,
String>) new TieredSpilloverCache.TieredSpilloverCacheFactory().create(
new CacheConfig.Builder<String, String>().setKeyType(String.class)
.setKeyType(String.class)
.setWeigher((k, v) -> keyValueSize)
.setRemovalListener(removalListener)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setSettings(settings)
.setDimensionNames(dimensionNames)
.setCachedResultParser(s -> new CachedQueryResult.PolicyValues(20_000_000L)) // Values will always appear to have taken
// 20_000_000 ns = 20 ms to compute
.setClusterSettings(clusterSettings)
.setStoragePath(storagePath)
.build(),
CacheType.INDICES_REQUEST_CACHE,
Map.of(
OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME,
new OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory(),
MockDiskCache.MockDiskCacheFactory.NAME,
// The size value passed in here acts as the "implementation setting" for the disk tier, and should also be ignored
new MockDiskCache.MockDiskCacheFactory(0, diskSizeFromImplSetting, false, keyValueSize)
)
);
checkSegmentSizes(tieredSpilloverCache, expectedHeapSize, expectedDiskSize);
}

public void testSegmentSizesWhenNotUsingFactory() {
long expectedHeapSize = 256L * between(10, 20);
long expectedDiskSize = MIN_DISK_CACHE_SIZE_IN_BYTES + 256L * between(30, 40);
int heapSizeFromImplSetting = 50;
int diskSizeFromImplSetting = 50;

Settings settings = Settings.builder()
.put(
CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
)
.put(FeatureFlags.PLUGGABLE_CACHE, "true")
// The size setting from the OpenSearchOnHeapCache implementation should not be honored
.put(
OpenSearchOnHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
heapSizeFromImplSetting + "b"
)
.build();

int keyValueSize = 1;
MockCacheRemovalListener<String, String> removalListener = new MockCacheRemovalListener<>();
int numSegments = getNumberOfSegments();
CacheConfig<String, String> cacheConfig = getCacheConfig(1, settings, removalListener, numSegments);
TieredSpilloverCache<String, String> tieredSpilloverCache = getTieredSpilloverCache(
new OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory(),
new MockDiskCache.MockDiskCacheFactory(0, diskSizeFromImplSetting, true, keyValueSize),
cacheConfig,
null,
removalListener,
numSegments,
expectedHeapSize,
expectedDiskSize
);
checkSegmentSizes(tieredSpilloverCache, expectedHeapSize, expectedDiskSize);
}

private void checkSegmentSizes(TieredSpilloverCache<String, String> cache, long expectedHeapSize, long expectedDiskSize) {
TieredSpilloverCache.TieredSpilloverCacheSegment<String, String> segment = cache.tieredSpilloverCacheSegments[0];
assertEquals(expectedHeapSize / cache.getNumberOfSegments(), segment.getOnHeapCacheMaxWeight());
assertEquals(expectedDiskSize / cache.getNumberOfSegments(), segment.getDiskCacheMaxWeight());
}

private List<String> getMockDimensions() {
List<String> dims = new ArrayList<>();
for (String dimensionName : dimensionNames) {
Expand Down Expand Up @@ -2455,9 +2584,9 @@ private void verifyComputeIfAbsentThrowsException(
MockCacheRemovalListener<String, String> removalListener = new MockCacheRemovalListener<>();
Settings settings = Settings.builder()
.put(
OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE)
.get(MAXIMUM_SIZE_IN_BYTES_KEY)
.getKey(),
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_SIZE.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
onHeapCacheSize * keyValueSize + "b"
)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public class EhcacheDiskCacheSettings {

/**
* Disk cache max size setting.
* If this cache is used as a tier in a TieredSpilloverCache, this setting is ignored.
*/
public static final Setting.AffixSetting<Long> DISK_CACHE_MAX_SIZE_IN_BYTES_SETTING = Setting.suffixKeySetting(
EhcacheDiskCache.EhcacheDiskCacheFactory.EHCACHE_DISK_CACHE_NAME + ".max_size_in_bytes",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,11 @@ private V deserializeValue(ByteArrayWrapper binary) {
return valueSerializer.deserialize(binary.value);
}

// Pkg-private for testing.
long getMaximumWeight() {
return maxWeightInBytes;
}

/**
* Factory to create an ehcache disk cache.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.serializer.BytesReferenceSerializer;
import org.opensearch.common.cache.serializer.Serializer;
import org.opensearch.common.cache.settings.CacheSettings;
import org.opensearch.common.cache.stats.ImmutableCacheStats;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
Expand Down Expand Up @@ -1201,6 +1203,65 @@ public void testEhcacheCloseWithDestroyCacheMethodThrowingException() throws Exc
ehcacheDiskCache.close();
}

public void testWithCacheConfigSizeSettings() throws Exception {
// The cache should get its size from the config if present, and otherwise should get it from the setting.
long maxSizeFromSetting = between(MINIMUM_MAX_SIZE_IN_BYTES + 1000, MINIMUM_MAX_SIZE_IN_BYTES + 2000);
long maxSizeFromConfig = between(MINIMUM_MAX_SIZE_IN_BYTES + 3000, MINIMUM_MAX_SIZE_IN_BYTES + 4000);

EhcacheDiskCache<String, String> cache = setupMaxSizeTest(maxSizeFromSetting, maxSizeFromConfig, false);
assertEquals(maxSizeFromSetting, cache.getMaximumWeight());

cache = setupMaxSizeTest(maxSizeFromSetting, maxSizeFromConfig, true);
assertEquals(maxSizeFromConfig, cache.getMaximumWeight());
}

// Modified from OpenSearchOnHeapCacheTests. Can't reuse, as we can't add a dependency on the server.test module.
private EhcacheDiskCache<String, String> setupMaxSizeTest(long maxSizeFromSetting, long maxSizeFromConfig, boolean putSizeInConfig)
throws Exception {
MockRemovalListener<String, String> listener = new MockRemovalListener<>();
try (NodeEnvironment env = newNodeEnvironment(Settings.builder().build())) {
Settings settings = Settings.builder()
.put(FeatureFlags.PLUGGABLE_CACHE, true)
.put(
CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
EhcacheDiskCache.EhcacheDiskCacheFactory.EHCACHE_DISK_CACHE_NAME
)
.put(
EhcacheDiskCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE)
.get(DISK_MAX_SIZE_IN_BYTES_KEY)
.getKey(),
maxSizeFromSetting
)
.put(
EhcacheDiskCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE)
.get(DISK_STORAGE_PATH_KEY)
.getKey(),
env.nodePaths()[0].indicesPath.toString() + "/request_cache/" + 0
)
.build();

CacheConfig.Builder<String, String> cacheConfigBuilder = new CacheConfig.Builder<String, String>().setKeyType(String.class)
.setValueType(String.class)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setWeigher(getWeigher())
.setRemovalListener(listener)
.setSettings(settings)
.setDimensionNames(List.of(dimensionName))
.setStatsTrackingEnabled(true);
if (putSizeInConfig) {
cacheConfigBuilder.setMaxSizeInBytes(maxSizeFromConfig);
}

ICache.Factory cacheFactory = new EhcacheDiskCache.EhcacheDiskCacheFactory();
return (EhcacheDiskCache<String, String>) cacheFactory.create(
cacheConfigBuilder.build(),
CacheType.INDICES_REQUEST_CACHE,
null
);
}
}

static class MockEhcahceDiskCache extends EhcacheDiskCache<String, String> {

public MockEhcahceDiskCache(Builder<String, String> builder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,8 @@ public CacheService(Map<String, ICache.Factory> cacheStoreTypeFactories, Setting
}

public <K, V> ICache<K, V> createCache(CacheConfig<K, V> config, CacheType cacheType) {
Setting<String> cacheSettingForCacheType = CacheSettings.CACHE_TYPE_STORE_NAME.getConcreteSettingForNamespace(
cacheType.getSettingPrefix()
);
String storeName = cacheSettingForCacheType.get(settings);
if (!FeatureFlags.PLUGGABLE_CACHE_SETTING.get(settings) || (storeName == null || storeName.isBlank())) {
String storeName = getStoreNameFromSetting(cacheType, settings);
if (!pluggableCachingEnabled(cacheType, settings)) {
peteralfonsi marked this conversation as resolved.
Show resolved Hide resolved
// Condition 1: In case feature flag is off, we default to onHeap.
// Condition 2: In case storeName is not explicitly mentioned, we assume user is looking to use older
// settings, so we again fallback to onHeap to maintain backward compatibility.
Expand All @@ -74,4 +71,19 @@ public NodeCacheStats stats(CommonStatsFlags flags) {
}
return new NodeCacheStats(statsMap, flags);
}

/**
* Check if pluggable caching is on, and if a store type is present for this cache type.
*/
public static boolean pluggableCachingEnabled(CacheType cacheType, Settings settings) {
String storeName = getStoreNameFromSetting(cacheType, settings);
return FeatureFlags.PLUGGABLE_CACHE_SETTING.get(settings) && storeName != null && !storeName.isBlank();
}

private static String getStoreNameFromSetting(CacheType cacheType, Settings settings) {
Setting<String> cacheSettingForCacheType = CacheSettings.CACHE_TYPE_STORE_NAME.getConcreteSettingForNamespace(
cacheType.getSettingPrefix()
);
return cacheSettingForCacheType.get(settings);
}
}
Loading
Loading