Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into snapshot-status-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
aggarwalShivani authored Apr 15, 2024
2 parents d1e6656 + 416083c commit 054e53e
Show file tree
Hide file tree
Showing 46 changed files with 2,645 additions and 655 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/wrapper.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: gradle/wrapper-validation-action@v2
- uses: gradle/wrapper-validation-action@v3
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Store] Make translog transfer timeout configurable ([#12704](https://github.com/opensearch-project/OpenSearch/pull/12704))
- Reject Resize index requests (i.e, split, shrink and clone), While DocRep to SegRep migration is in progress.([#12686](https://github.com/opensearch-project/OpenSearch/pull/12686))
- Add support for more than one protocol for transport ([#12967](https://github.com/opensearch-project/OpenSearch/pull/12967))
- [Tiered Caching] Add dimension-based stats to ICache implementations. ([#12531](https://github.com/opensearch-project/OpenSearch/pull/12531))
- Add changes for overriding remote store and replication settings during snapshot restore. ([#11868](https://github.com/opensearch-project/OpenSearch/pull/11868))
- Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Expand All @@ -31,6 +33,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `org.apache.commons:commonslang` from 3.13.0 to 3.14.0 ([#12627](https://github.com/opensearch-project/OpenSearch/pull/12627))
- Bump Apache Tika from 2.6.0 to 2.9.2 ([#12627](https://github.com/opensearch-project/OpenSearch/pull/12627))
- Bump `com.gradle.enterprise` from 3.16.2 to 3.17 ([#13116](https://github.com/opensearch-project/OpenSearch/pull/13116))
- Bump `gradle/wrapper-validation-action` from 2 to 3 ([#13192](https://github.com/opensearch-project/OpenSearch/pull/13192))

### Changed
- [BWC and API enforcement] Enforcing the presence of API annotations at build time ([#12872](https://github.com/opensearch-project/OpenSearch/pull/12872))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.ICacheKey;
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.cache.stats.ImmutableCacheStatsHolder;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -54,7 +56,11 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {

private final ICache<K, V> diskCache;
private final ICache<K, V> onHeapCache;
private final RemovalListener<K, V> removalListener;

// The listener for removals from the spillover cache as a whole
// TODO: In TSC stats PR, each tier will have its own separate removal listener.
private final RemovalListener<ICacheKey<K>, V> removalListener;
private final List<String> dimensionNames;
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
ReleasableLock readLock = new ReleasableLock(readWriteLock.readLock());
ReleasableLock writeLock = new ReleasableLock(readWriteLock.writeLock());
Expand All @@ -70,9 +76,9 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
this.removalListener = Objects.requireNonNull(builder.removalListener, "Removal listener can't be null");

this.onHeapCache = builder.onHeapCacheFactory.create(
new CacheConfig.Builder<K, V>().setRemovalListener(new RemovalListener<K, V>() {
new CacheConfig.Builder<K, V>().setRemovalListener(new RemovalListener<ICacheKey<K>, V>() {
@Override
public void onRemoval(RemovalNotification<K, V> notification) {
public void onRemoval(RemovalNotification<ICacheKey<K>, V> notification) {
try (ReleasableLock ignore = writeLock.acquire()) {
if (SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason())
&& evaluatePolicies(notification.getValue())) {
Expand All @@ -87,6 +93,7 @@ && evaluatePolicies(notification.getValue())) {
.setValueType(builder.cacheConfig.getValueType())
.setSettings(builder.cacheConfig.getSettings())
.setWeigher(builder.cacheConfig.getWeigher())
.setDimensionNames(builder.cacheConfig.getDimensionNames())
.setMaxSizeInBytes(builder.cacheConfig.getMaxSizeInBytes())
.setExpireAfterAccess(builder.cacheConfig.getExpireAfterAccess())
.setClusterSettings(builder.cacheConfig.getClusterSettings())
Expand All @@ -97,7 +104,7 @@ && evaluatePolicies(notification.getValue())) {
);
this.diskCache = builder.diskCacheFactory.create(builder.cacheConfig, builder.cacheType, builder.cacheFactories);
this.cacheList = Arrays.asList(onHeapCache, diskCache);

this.dimensionNames = builder.cacheConfig.getDimensionNames();
this.policies = builder.policies; // Will never be null; builder initializes it to an empty list
}

Expand All @@ -112,19 +119,19 @@ ICache<K, V> getDiskCache() {
}

@Override
public V get(K key) {
public V get(ICacheKey<K> key) {
return getValueFromTieredCache().apply(key);
}

@Override
public void put(K key, V value) {
public void put(ICacheKey<K> key, V value) {
try (ReleasableLock ignore = writeLock.acquire()) {
onHeapCache.put(key, value);
}
}

@Override
public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Exception {
public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader) throws Exception {

V cacheValue = getValueFromTieredCache().apply(key);
if (cacheValue == null) {
Expand All @@ -141,7 +148,7 @@ public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Except
}

@Override
public void invalidate(K key) {
public void invalidate(ICacheKey<K> key) {
// We are trying to invalidate the key from all caches though it would be present in only of them.
// Doing this as we don't know where it is located. We could do a get from both and check that, but what will
// also trigger a hit/miss listener event, so ignoring it for now.
Expand All @@ -167,9 +174,9 @@ public void invalidateAll() {
*/
@SuppressWarnings({ "unchecked" })
@Override
public Iterable<K> keys() {
Iterable<K>[] iterables = (Iterable<K>[]) new Iterable<?>[] { onHeapCache.keys(), diskCache.keys() };
return new ConcatenatedIterables<K>(iterables);
public Iterable<ICacheKey<K>> keys() {
Iterable<ICacheKey<K>>[] iterables = (Iterable<ICacheKey<K>>[]) new Iterable<?>[] { onHeapCache.keys(), diskCache.keys() };
return new ConcatenatedIterables<ICacheKey<K>>(iterables);
}

@Override
Expand Down Expand Up @@ -197,7 +204,12 @@ public void close() throws IOException {
}
}

private Function<K, V> getValueFromTieredCache() {
@Override
public ImmutableCacheStatsHolder stats() {
return null; // TODO: in TSC stats PR
}

private Function<ICacheKey<K>, V> getValueFromTieredCache() {
return key -> {
try (ReleasableLock ignore = readLock.acquire()) {
for (ICache<K, V> cache : cacheList) {
Expand Down Expand Up @@ -354,7 +366,7 @@ public String getCacheName() {
public static class Builder<K, V> {
private ICache.Factory onHeapCacheFactory;
private ICache.Factory diskCacheFactory;
private RemovalListener<K, V> removalListener;
private RemovalListener<ICacheKey<K>, V> removalListener;
private CacheConfig<K, V> cacheConfig;
private CacheType cacheType;
private Map<String, ICache.Factory> cacheFactories;
Expand Down Expand Up @@ -390,7 +402,7 @@ public Builder<K, V> setDiskCacheFactory(ICache.Factory diskCacheFactory) {
* @param removalListener Removal listener
* @return builder
*/
public Builder<K, V> setRemovalListener(RemovalListener<K, V> removalListener) {
public Builder<K, V> setRemovalListener(RemovalListener<ICacheKey<K>, V> removalListener) {
this.removalListener = removalListener;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@

import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.ICacheKey;
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.serializer.Serializer;
import org.opensearch.common.cache.stats.ImmutableCacheStatsHolder;
import org.opensearch.common.cache.store.builders.ICacheBuilder;
import org.opensearch.common.cache.store.config.CacheConfig;

Expand All @@ -25,27 +27,27 @@

public class MockDiskCache<K, V> implements ICache<K, V> {

Map<K, V> cache;
Map<ICacheKey<K>, V> cache;
int maxSize;
long delay;

private final RemovalListener<K, V> removalListener;
private final RemovalListener<ICacheKey<K>, V> removalListener;

public MockDiskCache(int maxSize, long delay, RemovalListener<K, V> removalListener) {
public MockDiskCache(int maxSize, long delay, RemovalListener<ICacheKey<K>, V> removalListener) {
this.maxSize = maxSize;
this.delay = delay;
this.removalListener = removalListener;
this.cache = new ConcurrentHashMap<K, V>();
this.cache = new ConcurrentHashMap<ICacheKey<K>, V>();
}

@Override
public V get(K key) {
public V get(ICacheKey<K> key) {
V value = cache.get(key);
return value;
}

@Override
public void put(K key, V value) {
public void put(ICacheKey<K> key, V value) {
if (this.cache.size() >= maxSize) { // For simplification
this.removalListener.onRemoval(new RemovalNotification<>(key, value, RemovalReason.EVICTED));
}
Expand All @@ -58,7 +60,7 @@ public void put(K key, V value) {
}

@Override
public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) {
public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader) {
V value = cache.computeIfAbsent(key, key1 -> {
try {
return loader.load(key);
Expand All @@ -70,7 +72,7 @@ public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) {
}

@Override
public void invalidate(K key) {
public void invalidate(ICacheKey<K> key) {
this.cache.remove(key);
}

Expand All @@ -80,7 +82,7 @@ public void invalidateAll() {
}

@Override
public Iterable<K> keys() {
public Iterable<ICacheKey<K>> keys() {
return () -> new CacheKeyIterator<>(cache, removalListener);
}

Expand All @@ -92,6 +94,11 @@ public long count() {
@Override
public void refresh() {}

@Override
public ImmutableCacheStatsHolder stats() {
return null;
}

@Override
public void close() {

Expand Down
Loading

0 comments on commit 054e53e

Please sign in to comment.