Skip to content

Commit

Permalink
[Tiered Caching] Make took time policy dynamic and add additional int…
Browse files Browse the repository at this point in the history
…eg tests (opensearch-project#13063)

---------

Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>
Signed-off-by: Sagar <99425694+sgup432@users.noreply.github.com>
  • Loading branch information
sgup432 authored and Peter Alfonsi committed Aug 30, 2024
1 parent 6f88777 commit 40a8d34
Show file tree
Hide file tree
Showing 13 changed files with 599 additions and 54 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@

package org.opensearch.cache.common.policy;

import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.unit.TimeValue;

import java.util.function.Function;
import java.util.function.Predicate;

import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;

/**
* A cache tier policy which accepts queries whose took time is greater than some threshold.
* The threshold should be set to approximately the time it takes to get a result from the cache tier.
Expand All @@ -30,7 +34,7 @@ public class TookTimePolicy<V> implements Predicate<V> {
/**
* The minimum took time to allow a query. Set to TimeValue.ZERO to let all data through.
*/
private final TimeValue threshold;
private TimeValue threshold;

/**
* Function which extracts the relevant PolicyValues from a serialized CachedQueryResult
Expand All @@ -41,13 +45,25 @@ public class TookTimePolicy<V> implements Predicate<V> {
* Constructs a took time policy.
* @param threshold the threshold
* @param cachedResultParser the function providing policy values
* @param clusterSettings cluster settings
* @param cacheType cache type
*/
public TookTimePolicy(TimeValue threshold, Function<V, CachedQueryResult.PolicyValues> cachedResultParser) {
public TookTimePolicy(
TimeValue threshold,
Function<V, CachedQueryResult.PolicyValues> cachedResultParser,
ClusterSettings clusterSettings,
CacheType cacheType
) {
if (threshold.compareTo(TimeValue.ZERO) < 0) {
throw new IllegalArgumentException("Threshold for TookTimePolicy must be >= 0ms but was " + threshold.getStringRep());
}
this.threshold = threshold;
this.cachedResultParser = cachedResultParser;
clusterSettings.addSettingsUpdateConsumer(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType), this::setThreshold);
}

private void setThreshold(TimeValue threshold) {
this.threshold = threshold;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,21 @@
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.common.util.iterable.Iterables;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand All @@ -47,6 +49,9 @@
@ExperimentalApi
public class TieredSpilloverCache<K, V> implements ICache<K, V> {

// Used to avoid caching stale entries in lower tiers.
private static final List<RemovalReason> SPILLOVER_REMOVAL_REASONS = List.of(RemovalReason.EVICTED, RemovalReason.CAPACITY);

private final ICache<K, V> diskCache;
private final ICache<K, V> onHeapCache;
private final RemovalListener<K, V> removalListener;
Expand All @@ -69,8 +74,11 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
@Override
public void onRemoval(RemovalNotification<K, V> notification) {
try (ReleasableLock ignore = writeLock.acquire()) {
if (evaluatePolicies(notification.getValue())) {
if (SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason())
&& evaluatePolicies(notification.getValue())) {
diskCache.put(notification.getKey(), notification.getValue());
} else {
removalListener.onRemoval(notification);
}
}
}
Expand All @@ -81,6 +89,7 @@ public void onRemoval(RemovalNotification<K, V> notification) {
.setWeigher(builder.cacheConfig.getWeigher())
.setMaxSizeInBytes(builder.cacheConfig.getMaxSizeInBytes())
.setExpireAfterAccess(builder.cacheConfig.getExpireAfterAccess())
.setClusterSettings(builder.cacheConfig.getClusterSettings())
.build(),
builder.cacheType,
builder.cacheFactories
Expand Down Expand Up @@ -156,10 +165,11 @@ public void invalidateAll() {
* Provides an iteration over both onHeap and disk keys. This is not protected from any mutations to the cache.
* @return An iterable over (onHeap + disk) keys
*/
@SuppressWarnings("unchecked")
@SuppressWarnings({ "unchecked" })
@Override
public Iterable<K> keys() {
return Iterables.concat(onHeapCache.keys(), diskCache.keys());
Iterable<K>[] iterables = (Iterable<K>[]) new Iterable<?>[] { onHeapCache.keys(), diskCache.keys() };
return new ConcatenatedIterables<K>(iterables);
}

@Override
Expand Down Expand Up @@ -213,6 +223,67 @@ boolean evaluatePolicies(V value) {
return true;
}

/**
* ConcatenatedIterables which combines cache iterables and supports remove() functionality as well if underlying
* iterator supports it.
* @param <K> Type of key.
*/
static class ConcatenatedIterables<K> implements Iterable<K> {

final Iterable<K>[] iterables;

ConcatenatedIterables(Iterable<K>[] iterables) {
this.iterables = iterables;
}

@SuppressWarnings({ "unchecked" })
@Override
public Iterator<K> iterator() {
Iterator<K>[] iterators = (Iterator<K>[]) new Iterator<?>[iterables.length];
for (int i = 0; i < iterables.length; i++) {
iterators[i] = iterables[i].iterator();
}
return new ConcatenatedIterator<>(iterators);
}

static class ConcatenatedIterator<T> implements Iterator<T> {
private final Iterator<T>[] iterators;
private int currentIteratorIndex;
private Iterator<T> currentIterator;

public ConcatenatedIterator(Iterator<T>[] iterators) {
this.iterators = iterators;
this.currentIteratorIndex = 0;
this.currentIterator = iterators[currentIteratorIndex];
}

@Override
public boolean hasNext() {
while (!currentIterator.hasNext()) {
currentIteratorIndex++;
if (currentIteratorIndex == iterators.length) {
return false;
}
currentIterator = iterators[currentIteratorIndex];
}
return true;
}

@Override
public T next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return currentIterator.next();
}

@Override
public void remove() {
currentIterator.remove();
}
}
}

/**
* Factory to create TieredSpilloverCache objects.
*/
Expand Down Expand Up @@ -253,8 +324,7 @@ public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType,
}
ICache.Factory diskCacheFactory = cacheFactories.get(diskCacheStoreName);

TimeValue diskPolicyThreshold = TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD
.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
TimeValue diskPolicyThreshold = TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType)
.get(settings);
Function<V, CachedQueryResult.PolicyValues> cachedResultParser = Objects.requireNonNull(
config.getCachedResultParser(),
Expand All @@ -266,7 +336,7 @@ public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType,
.setRemovalListener(config.getRemovalListener())
.setCacheConfig(config)
.setCacheType(cacheType)
.addPolicy(new TookTimePolicy<V>(diskPolicyThreshold, cachedResultParser))
.addPolicy(new TookTimePolicy<V>(diskPolicyThreshold, cachedResultParser, config.getClusterSettings(), cacheType))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import java.util.List;
import java.util.Map;

import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;

/**
* Plugin for TieredSpilloverCache.
*/
Expand Down Expand Up @@ -51,11 +53,7 @@ public List<Setting<?>> getSettings() {
settingList.add(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
);
settingList.add(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD.getConcreteSettingForNamespace(
cacheType.getSettingPrefix()
)
);
settingList.add(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType));
}
return settingList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@

package org.opensearch.cache.common.tier;

import org.opensearch.common.cache.CacheType;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.TimeValue;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.opensearch.common.settings.Setting.Property.NodeScope;
Expand Down Expand Up @@ -42,17 +45,36 @@ public class TieredSpilloverCacheSettings {
/**
* Setting defining the minimum took time for a query to be allowed into the disk cache.
*/
public static final Setting.AffixSetting<TimeValue> TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD = Setting.suffixKeySetting(
private static final Setting.AffixSetting<TimeValue> TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD = Setting.suffixKeySetting(
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".disk.store.policies.took_time.threshold",
(key) -> Setting.timeSetting(
key,
new TimeValue(10, TimeUnit.MILLISECONDS), // Default value for this setting
TimeValue.ZERO, // Minimum value for this setting
NodeScope
NodeScope,
Setting.Property.Dynamic
)
);
// 10 ms was chosen as a safe value based on proof of concept, where we saw disk latencies in this range.
// Will be tuned further with future benchmarks.

/**
* Stores took time policy settings for various cache types as these are dynamic so that can be registered and
* retrieved accordingly.
*/
public static final Map<CacheType, Setting<TimeValue>> TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;

/**
* Fetches concrete took time policy settings.
*/
static {
Map<CacheType, Setting<TimeValue>> concreteTookTimePolicySettingMap = new HashMap<>();
for (CacheType cacheType : CacheType.values()) {
concreteTookTimePolicySettingMap.put(
cacheType,
TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
);
}
TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP = concreteTookTimePolicySettingMap;
}

/**
* Default constructor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,27 @@
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHits;
import org.opensearch.common.Randomness;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.search.TopDocsAndMaxScore;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

import java.io.IOException;
import java.util.HashSet;
import java.util.Random;
import java.util.function.Function;

import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;

public class TookTimePolicyTests extends OpenSearchTestCase {
private final Function<BytesReference, CachedQueryResult.PolicyValues> transformationFunction = (data) -> {
try {
Expand All @@ -35,8 +42,17 @@ public class TookTimePolicyTests extends OpenSearchTestCase {
}
};

private ClusterSettings clusterSettings;

@Before
public void setup() {
Settings settings = Settings.EMPTY;
clusterSettings = new ClusterSettings(settings, new HashSet<>());
clusterSettings.registerSetting(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE));
}

private TookTimePolicy<BytesReference> getTookTimePolicy(TimeValue threshold) {
return new TookTimePolicy<>(threshold, transformationFunction);
return new TookTimePolicy<>(threshold, transformationFunction, clusterSettings, CacheType.INDICES_REQUEST_CACHE);
}

public void testTookTimePolicy() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import org.opensearch.common.cache.store.builders.ICacheBuilder;
import org.opensearch.common.cache.store.config.CacheConfig;

import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;

public class MockDiskCache<K, V> implements ICache<K, V> {
Expand Down Expand Up @@ -79,7 +81,7 @@ public void invalidateAll() {

@Override
public Iterable<K> keys() {
return this.cache.keySet();
return () -> new CacheKeyIterator<>(cache, removalListener);
}

@Override
Expand Down Expand Up @@ -156,4 +158,48 @@ public Builder<K, V> setValueSerializer(Serializer<V, byte[]> valueSerializer) {
}

}

/**
* Provides a iterator over keys.
* @param <K> Type of key
* @param <V> Type of value
*/
static class CacheKeyIterator<K, V> implements Iterator<K> {
private final Iterator<Map.Entry<K, V>> entryIterator;
private final Map<K, V> cache;
private final RemovalListener<K, V> removalListener;
private K currentKey;

public CacheKeyIterator(Map<K, V> cache, RemovalListener<K, V> removalListener) {
this.entryIterator = cache.entrySet().iterator();
this.removalListener = removalListener;
this.cache = cache;
}

@Override
public boolean hasNext() {
return entryIterator.hasNext();
}

@Override
public K next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
Map.Entry<K, V> entry = entryIterator.next();
currentKey = entry.getKey();
return currentKey;
}

@Override
public void remove() {
if (currentKey == null) {
throw new IllegalStateException("No element to remove");
}
V value = cache.get(currentKey);
cache.remove(currentKey);
this.removalListener.onRemoval(new RemovalNotification<>(currentKey, value, RemovalReason.INVALIDATED));
currentKey = null;
}
}
}
Loading

0 comments on commit 40a8d34

Please sign in to comment.