Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tiered Caching Stats] TSC stats implementation + integration #25

Open
wants to merge 28 commits into
base: tiramisu-stats-tiers
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
cfdd89c
Added TSC stats implementation
Mar 15, 2024
1432abb
Removed redundant tests
Mar 15, 2024
1bebdd3
Merge branch 'tiramisu-stats-base' into tiramisu-stats-tsc
Mar 15, 2024
3605730
Hooked up API to CacheStats output
Mar 15, 2024
7bf4d2e
Added logic in CacheStats toXContent()
Mar 15, 2024
6c1dffe
Merge branch 'tiramisu-stats-tiers' into tiramisu-stats-api
Mar 27, 2024
ca31bb7
Simplified toXContent
Mar 27, 2024
0dd1057
Finished xcontent UT
Mar 28, 2024
3a90973
readded tests depending on xcontent
Mar 28, 2024
e6dca8d
changed statsholder key to contain whole dimension
Mar 29, 2024
08b688f
Revert "changed statsholder key to contain whole dimension"
Apr 1, 2024
82a8454
Merge branch 'tiramisu-stats-tiers' into tiramisu-stats-api
Apr 1, 2024
ee808ba
Fixed tests with most recent changes
Apr 1, 2024
cf0a47e
Merge branch 'tiramisu-stats-tiers' into tiramisu-stats-api
Apr 1, 2024
05541e0
Added store name to api response
Apr 1, 2024
83facb2
first chunk of IT
Apr 1, 2024
7d1c385
Merge branch 'tiramisu-stats-tiers' into tiramisu-stats-api
Apr 8, 2024
8a9ce1f
Merge branch 'tiramisu-stats-tiers' into tiramisu-stats-api
Apr 8, 2024
c385359
Finished merge
Apr 8, 2024
a594e47
Merge branch 'tiramisu-stats-tiers' into tiramisu-stats-tsc
Apr 9, 2024
96c9493
First draft of single-StatsHolder setup
Apr 9, 2024
c4cea24
Merge branch 'tiramisu-stats-tiers' into tiramisu-stats-api
Apr 9, 2024
e7d9261
Merge branch 'tiramisu-stats-api' into tiramisu-stats-tsc
Apr 9, 2024
7fec02d
Updated tests to check stats values
Apr 9, 2024
7db6385
Revert "Merge branch 'tiramisu-stats-api' into tiramisu-stats-tsc"
Apr 10, 2024
b3aecc4
Merge branch 'tiramisu-stats-tiers' into tiramisu-stats-tsc
Apr 10, 2024
585e4cb
Simplified tsc stats testing logic
Apr 10, 2024
a77ee97
Merge branch 'tiramisu-stats-tiers' into tiramisu-stats-tsc
Apr 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.cache.stats.CacheStats;
import org.opensearch.common.cache.stats.StatsHolder;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
Expand All @@ -35,6 +38,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.ToLongBiFunction;

/**
* This cache spillover the evicted items from heap tier to disk tier. All the new items are first cached on heap
Expand All @@ -52,9 +56,16 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
private final ICache<K, V> diskCache;
private final ICache<K, V> onHeapCache;

private final RemovalListener<ICacheKey<K>, V> onDiskRemovalListener;
private final RemovalListener<ICacheKey<K>, V> onHeapRemovalListener;

// The listener for removals from the spillover cache as a whole
// TODO: In TSC stats PR, each tier will have its own separate removal listener.
private final RemovalListener<ICacheKey<K>, V> removalListener;

// In future we want to just read the stats from the individual tiers' statsHolder objects, but this isn't
// possible right now because of the way computeIfAbsent is implemented.
private final StatsHolder statsHolder;
private ToLongBiFunction<ICacheKey<K>, V> weigher;
private final List<String> dimensionNames;
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
ReleasableLock readLock = new ReleasableLock(readWriteLock.readLock());
Expand All @@ -63,24 +74,25 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
* Maintains caching tiers in ascending order of cache latency.
*/
private final List<ICache<K, V>> cacheList;
private final List<Tuple<ICache<K, V>, String>> cacheAndTierValueList;
private final List<Predicate<V>> policies;

// Common values used for tier dimension
public static final String TIER_DIMENSION_NAME = "tier";
public static final String TIER_DIMENSION_VALUE_ON_HEAP = "on_heap";
public static final String TIER_DIMENSION_VALUE_DISK = "disk";

TieredSpilloverCache(Builder<K, V> builder) {
Objects.requireNonNull(builder.onHeapCacheFactory, "onHeap cache builder can't be null");
Objects.requireNonNull(builder.diskCacheFactory, "disk cache builder can't be null");
this.removalListener = Objects.requireNonNull(builder.removalListener, "Removal listener can't be null");

this.onHeapRemovalListener = new HeapTierRemovalListener(this);
this.onDiskRemovalListener = new DiskTierRemovalListener(this);
this.weigher = Objects.requireNonNull(builder.cacheConfig.getWeigher(), "Weigher can't be null");

this.onHeapCache = builder.onHeapCacheFactory.create(
new CacheConfig.Builder<K, V>().setRemovalListener(new RemovalListener<ICacheKey<K>, V>() {
@Override
public void onRemoval(RemovalNotification<ICacheKey<K>, V> notification) {
try (ReleasableLock ignore = writeLock.acquire()) {
if (evaluatePolicies(notification.getValue())) {
diskCache.put(notification.getKey(), notification.getValue());
}
}
}
})
new CacheConfig.Builder<K, V>().setRemovalListener(onHeapRemovalListener)
.setKeyType(builder.cacheConfig.getKeyType())
.setValueType(builder.cacheConfig.getValueType())
.setSettings(builder.cacheConfig.getSettings())
Expand All @@ -91,11 +103,28 @@ public void onRemoval(RemovalNotification<ICacheKey<K>, V> notification) {
.build(),
builder.cacheType,
builder.cacheFactories
);

this.diskCache = builder.diskCacheFactory.create(
new CacheConfig.Builder<K, V>().setRemovalListener(onDiskRemovalListener)
.setKeyType(builder.cacheConfig.getKeyType())
.setValueType(builder.cacheConfig.getValueType())
.setSettings(builder.cacheConfig.getSettings())
.setWeigher(builder.cacheConfig.getWeigher())
.setDimensionNames(builder.cacheConfig.getDimensionNames())
.build(),
builder.cacheType,
builder.cacheFactories
);
this.diskCache = builder.diskCacheFactory.create(builder.cacheConfig, builder.cacheType, builder.cacheFactories);
this.cacheList = Arrays.asList(onHeapCache, diskCache);

this.dimensionNames = builder.cacheConfig.getDimensionNames();
this.cacheAndTierValueList = List.of(
new Tuple<>(onHeapCache, TIER_DIMENSION_VALUE_ON_HEAP),
new Tuple<>(diskCache, TIER_DIMENSION_VALUE_DISK)
);
// Pass "tier" as the innermost dimension name, in addition to whatever dimensions are specified for the cache as a whole
this.statsHolder = new StatsHolder(addTierValueToDimensionValues(dimensionNames, TIER_DIMENSION_NAME));
this.policies = builder.policies; // Will never be null; builder initializes it to an empty list
}

Expand All @@ -118,12 +147,12 @@ public V get(ICacheKey<K> key) {
public void put(ICacheKey<K> key, V value) {
try (ReleasableLock ignore = writeLock.acquire()) {
onHeapCache.put(key, value);
updateStatsOnPut(TIER_DIMENSION_VALUE_ON_HEAP, key, value);
}
}

@Override
public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader) throws Exception {

V cacheValue = getValueFromTieredCache().apply(key);
if (cacheValue == null) {
// Add the value to the onHeap cache. We are calling computeIfAbsent which does another get inside.
Expand All @@ -132,6 +161,10 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>
V value = null;
try (ReleasableLock ignore = writeLock.acquire()) {
value = onHeapCache.computeIfAbsent(key, loader);
if (loader.isLoaded()) {
// The value was just computed and added to the cache
updateStatsOnPut(TIER_DIMENSION_VALUE_ON_HEAP, key, value);
}
}
return value;
}
Expand All @@ -143,9 +176,14 @@ public void invalidate(ICacheKey<K> key) {
// We are trying to invalidate the key from all caches though it would be present in only of them.
// Doing this as we don't know where it is located. We could do a get from both and check that, but what will
// also trigger a hit/miss listener event, so ignoring it for now.
// We don't update stats here, as this is handled by the removal listeners for the tiers.
try (ReleasableLock ignore = writeLock.acquire()) {
for (ICache<K, V> cache : cacheList) {
cache.invalidate(key);
for (Tuple<ICache<K, V>, String> pair : cacheAndTierValueList) {
if (key.getDropStatsForDimensions()) {
List<String> dimensionValues = addTierValueToDimensionValues(key.dimensions, pair.v2());
statsHolder.removeDimensions(dimensionValues);
}
pair.v1().invalidate(key);
}
}
}
Expand All @@ -157,6 +195,7 @@ public void invalidateAll() {
cache.invalidateAll();
}
}
statsHolder.reset();
}

/**
Expand All @@ -171,11 +210,7 @@ public Iterable<ICacheKey<K>> keys() {

@Override
public long count() {
long count = 0;
for (ICache<K, V> cache : cacheList) {
count += cache.count();
}
return count;
return statsHolder.count();
}

@Override
Expand All @@ -196,26 +231,78 @@ public void close() throws IOException {

@Override
public CacheStats stats() {
return null; // TODO: in TSC stats PR
return statsHolder.getCacheStats();
}

private Function<ICacheKey<K>, V> getValueFromTieredCache() {
return key -> {
try (ReleasableLock ignore = readLock.acquire()) {
for (ICache<K, V> cache : cacheList) {
V value = cache.get(key);
for (Tuple<ICache<K, V>, String> pair : cacheAndTierValueList) {
V value = pair.v1().get(key);
List<String> dimensionValues = addTierValueToDimensionValues(key.dimensions, pair.v2()); // Get the tier value
// corresponding to this cache
if (value != null) {
// update hit stats
statsHolder.incrementHits(dimensionValues);
return value;
} else {
// update miss stats
statsHolder.incrementMisses(dimensionValues);
}
}
}
return null;
};
}

void handleRemovalFromHeapTier(RemovalNotification<ICacheKey<K>, V> notification) {
ICacheKey<K> key = notification.getKey();

boolean wasEvicted = false;
if (RemovalReason.EVICTED.equals(notification.getRemovalReason())
|| RemovalReason.CAPACITY.equals(notification.getRemovalReason())) {
try (ReleasableLock ignore = writeLock.acquire()) {
if (evaluatePolicies(notification.getValue())) {
diskCache.put(key, notification.getValue()); // spill over to the disk tier and increment its stats
updateStatsOnPut(TIER_DIMENSION_VALUE_DISK, key, notification.getValue());
}
}
wasEvicted = true;
}

else {
// If the removal was for another reason, send this notification to the TSC's removal listener, as the value is leaving the TSC
// entirely
removalListener.onRemoval(notification);
}
updateStatsOnRemoval(TIER_DIMENSION_VALUE_ON_HEAP, wasEvicted, key, notification.getValue());
}

void handleRemovalFromDiskTier(RemovalNotification<ICacheKey<K>, V> notification) {
// Values removed from the disk tier leave the TSC entirely
removalListener.onRemoval(notification);

boolean wasEvicted = false;
if (RemovalReason.EVICTED.equals(notification.getRemovalReason())
|| RemovalReason.CAPACITY.equals(notification.getRemovalReason())) {
wasEvicted = true;
}
updateStatsOnRemoval(TIER_DIMENSION_VALUE_DISK, wasEvicted, notification.getKey(), notification.getValue());
}

void updateStatsOnRemoval(String removedFromTierValue, boolean wasEvicted, ICacheKey<K> key, V value) {
List<String> dimensionValues = addTierValueToDimensionValues(key.dimensions, removedFromTierValue);
if (wasEvicted) {
statsHolder.incrementEvictions(dimensionValues);
}
statsHolder.decrementEntries(dimensionValues);
statsHolder.decrementSizeInBytes(dimensionValues, weigher.applyAsLong(key, value));
}

void updateStatsOnPut(String destinationTierValue, ICacheKey<K> key, V value) {
List<String> dimensionValues = addTierValueToDimensionValues(key.dimensions, destinationTierValue);
statsHolder.incrementEntries(dimensionValues);
statsHolder.incrementSizeInBytes(dimensionValues, weigher.applyAsLong(key, value));
}

boolean evaluatePolicies(V value) {
for (Predicate<V> policy : policies) {
if (!policy.test(value)) {
Expand All @@ -225,6 +312,47 @@ boolean evaluatePolicies(V value) {
return true;
}

/**
* Add tierValue to the end of a copy of the initial dimension values.
*/
private List<String> addTierValueToDimensionValues(List<String> initialDimensions, String tierValue) {
List<String> result = new ArrayList<>(initialDimensions);
result.add(tierValue);
return result;
}

/**
* A class which receives removal events from the heap tier.
*/
private class HeapTierRemovalListener implements RemovalListener<ICacheKey<K>, V> {
private final TieredSpilloverCache<K, V> tsc;

HeapTierRemovalListener(TieredSpilloverCache<K, V> tsc) {
this.tsc = tsc;
}

@Override
public void onRemoval(RemovalNotification<ICacheKey<K>, V> notification) {
tsc.handleRemovalFromHeapTier(notification);
}
}

/**
* A class which receives removal events from the disk tier.
*/
private class DiskTierRemovalListener implements RemovalListener<ICacheKey<K>, V> {
private final TieredSpilloverCache<K, V> tsc;

DiskTierRemovalListener(TieredSpilloverCache<K, V> tsc) {
this.tsc = tsc;
}

@Override
public void onRemoval(RemovalNotification<ICacheKey<K>, V> notification) {
tsc.handleRemovalFromDiskTier(notification);
}
}

/**
* Factory to create TieredSpilloverCache objects.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>

@Override
public void invalidate(ICacheKey<K> key) {
removalListener.onRemoval(new RemovalNotification<>(key, cache.get(key), RemovalReason.INVALIDATED));
this.cache.remove(key);
}

Expand Down
Loading
Loading