From 52b9dfdb0b4d8b564eb7451e0344c363496aeab1 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Mon, 13 Nov 2023 17:24:07 -0800 Subject: [PATCH 01/21] Update IndicesRequestCache.java to delete stale keys from diskcache --- .../indices/IndicesRequestCache.java | 85 +++++++++++++++---- 1 file changed, 70 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 2fbdd43abb23d..665c01b99d2c3 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -170,6 +170,7 @@ public final class IndicesRequestCache implements TieredCacheEventListener currentKeysToClean = new HashSet<>(); final Set currentFullClean = new HashSet<>(); - currentKeysToClean.clear(); - currentFullClean.clear(); - for (Iterator iterator = keysToClean.iterator(); iterator.hasNext();) { + + Iterator iterator = keysToClean.iterator(); + while(iterator.hasNext()) { CleanupKey cleanupKey = iterator.next(); iterator.remove(); - if (cleanupKey.readerCacheKeyId == null || cleanupKey.entity.isOpen() == false) { - // null indicates full cleanup, as does a closed shard + if (needsFullClean(cleanupKey)) { currentFullClean.add(cleanupKey.entity.getCacheIdentity()); } else { currentKeysToClean.add(cleanupKey); } } - if (!currentKeysToClean.isEmpty() || !currentFullClean.isEmpty()) { - for (Iterator iterator = tieredCacheService.getOnHeapCachingTier().keys().iterator(); iterator.hasNext();) { - Key key = iterator.next(); - if (currentFullClean.contains(key.entity.getCacheIdentity())) { + + // Early exit if no cleanup is needed + if (currentKeysToClean.isEmpty() && currentFullClean.isEmpty()) { + return; + } + cleanTieredCaches(currentKeysToClean, currentFullClean); + } + + private void cleanTieredCaches(Set currentKeysToClean, Set currentFullClean) { + cleanOnHeapCache(currentKeysToClean, currentFullClean); + cleanDiskCache(currentKeysToClean, currentFullClean); + } + + // keeping this unsynchronized since we don't expect it to be called only by cleanCache which is synchronized + private void cleanDiskCache(Set currentKeysToClean, Set currentFullClean) { + if (tieredCacheService.getDiskCachingTier().isEmpty()) { + logger.debug("Skipping disk cache keys cleanup since no disk cache is configured"); + return; + } + final double cleanupKeysThresholdPercentage = 50.0; // TODO make this an index setting + int totalKeysInDiskCache = tieredCacheService.getDiskCachingTier().get().count(); + int totalKeysToCleanup = currentKeysToClean.size() + currentFullClean.size(); + + double cleanupKeysPercentage = ((double) totalKeysToCleanup / totalKeysInDiskCache) * 100; + if (cleanupKeysPercentage < cleanupKeysThresholdPercentage) { + logger.debug("Skipping disk cache keys cleanup since the keys to cleanup of {}% is not greater than " + + "the threshold percentage of {}%", cleanupKeysPercentage, cleanupKeysThresholdPercentage); + return; + } + + Iterator iterator = tieredCacheService.getDiskCachingTier().get().keys().iterator(); + while (iterator.hasNext()) { + Key key = iterator.next(); + if (currentFullClean.contains(key.entity.getCacheIdentity())) { + iterator.remove(); + currentFullClean.remove(key.entity.getCacheIdentity()); + } else { + CleanupKey cleanupKey = new CleanupKey(key.entity, key.readerCacheKeyId); + if (currentKeysToClean.contains(cleanupKey)) { + iterator.remove(); + currentKeysToClean.remove(cleanupKey); + } + } + } + } + + // keeping this unsynchronized since we don't expect it to be called only by cleanCache which is synchronized + private void cleanOnHeapCache(Set currentKeysToClean, Set currentFullClean) { + Iterator iterator = tieredCacheService.getOnHeapCachingTier().keys().iterator(); + while (iterator.hasNext()) { + Key key = iterator.next(); + if (currentFullClean.contains(key.entity.getCacheIdentity())) { + iterator.remove(); + currentFullClean.remove(key.entity.getCacheIdentity()); + } else { + CleanupKey cleanupKey = new CleanupKey(key.entity, key.readerCacheKeyId); + if (currentKeysToClean.contains(cleanupKey)) { iterator.remove(); - } else { - if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerCacheKeyId))) { - iterator.remove(); - } + currentKeysToClean.remove(cleanupKey); } } } tieredCacheService.getOnHeapCachingTier().refresh(); } + private boolean needsFullClean(CleanupKey cleanupKey) { + // null indicates full cleanup, as does a closed shard + return cleanupKey.readerCacheKeyId == null || !cleanupKey.entity.isOpen(); + } + /** * Returns the current size of the cache */ From 5bab65d449900c89b0a756c48a154d7a116129d3 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Thu, 16 Nov 2023 21:55:51 -0800 Subject: [PATCH 02/21] More updates to disk cache cleanup --- .../indices/IndicesRequestCache.java | 138 ++++++++------ .../opensearch/indices/IndicesService.java | 136 +++++++++++--- .../indices/IndicesRequestCacheTests.java | 170 +++++++++++++++++- 3 files changed, 360 insertions(+), 84 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 665c01b99d2c3..fb7662a57a6de 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -42,8 +42,9 @@ import org.opensearch.common.cache.RemovalNotification; import org.opensearch.common.cache.tier.BytesReferenceSerializer; import org.opensearch.common.cache.tier.CachePolicyInfoWrapper; -import org.opensearch.common.cache.tier.DiskTierTookTimePolicy; import org.opensearch.common.cache.tier.CacheValue; +import org.opensearch.common.cache.tier.CachingTier; +import org.opensearch.common.cache.tier.DiskTierTookTimePolicy; import org.opensearch.common.cache.tier.EhCacheDiskCachingTier; import org.opensearch.common.cache.tier.OnHeapCachingTier; import org.opensearch.common.cache.tier.OpenSearchOnHeapCache; @@ -64,7 +65,6 @@ import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.common.unit.ByteSizeValue; -import org.opensearch.search.query.QuerySearchResult; import java.io.Closeable; import java.io.IOException; @@ -174,6 +174,19 @@ public final class IndicesRequestCache implements TieredCacheEventListener tieredCacheService + ) { + this.size = INDICES_CACHE_QUERY_SIZE.get(settings); + this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null; + long sizeInBytes = size.getBytes(); + this.indicesService = indicesService; + this.tieredCacheService = tieredCacheService; + } + @Override public void close() { tieredCacheService.invalidateAll(); @@ -238,9 +251,10 @@ BytesReference getOrCompute( /** * Invalidates the given the cache entry for the given key and it's context + * * @param cacheEntity the cache entity to invalidate for - * @param reader the reader to invalidate the cache entry for - * @param cacheKey the cache key to invalidate + * @param reader the reader to invalidate the cache entry for + * @param cacheKey the cache key to invalidate */ void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference cacheKey) { assert reader.getReaderCacheHelper() != null; @@ -426,83 +440,87 @@ public int hashCode() { /** * Logic to clean up in-memory cache. */ - synchronized void cleanCache() { // TODO rename this method to plural or cleanTieredCache ? + synchronized void cleanCache() { final Set currentKeysToClean = new HashSet<>(); final Set currentFullClean = new HashSet<>(); - Iterator iterator = keysToClean.iterator(); - while(iterator.hasNext()) { - CleanupKey cleanupKey = iterator.next(); - iterator.remove(); - if (needsFullClean(cleanupKey)) { - currentFullClean.add(cleanupKey.entity.getCacheIdentity()); - } else { - currentKeysToClean.add(cleanupKey); - } - } + categorizeKeysForCleanup(currentKeysToClean, currentFullClean); // Early exit if no cleanup is needed if (currentKeysToClean.isEmpty() && currentFullClean.isEmpty()) { return; } - cleanTieredCaches(currentKeysToClean, currentFullClean); + + cleanUpKeys( + tieredCacheService.getOnHeapCachingTier().keys().iterator(), + currentKeysToClean, + currentFullClean + ); + tieredCacheService.getOnHeapCachingTier().refresh(); } - private void cleanTieredCaches(Set currentKeysToClean, Set currentFullClean) { - cleanOnHeapCache(currentKeysToClean, currentFullClean); - cleanDiskCache(currentKeysToClean, currentFullClean); + /** + * Logic to clean up disk based cache. + *

+ * TODO: cleanDiskCache is very specific to disk caching tier. We can refactor this to be more generic. + */ + synchronized void cleanDiskCache(double diskCachesCleanThresholdPercent) { + tieredCacheService.getDiskCachingTier().ifPresent(diskCachingTier -> { + if (diskCachingTier.count() == 0 || diskCleanupKeysPercentage() < diskCachesCleanThresholdPercent) { + if (logger.isDebugEnabled()) { + logger.debug("Skipping disk cache keys cleanup"); + return; + } + } + Set currentKeysToClean = new HashSet<>(); + Set currentFullClean = new HashSet<>(); + + categorizeKeysForCleanup(currentKeysToClean, currentFullClean); + + // Early exit if no cleanup is needed + if (currentKeysToClean.isEmpty() && currentFullClean.isEmpty()) { + return; + } + cleanUpKeys(diskCachingTier.keys().iterator(), currentKeysToClean, currentFullClean); + }); } - // keeping this unsynchronized since we don't expect it to be called only by cleanCache which is synchronized - private void cleanDiskCache(Set currentKeysToClean, Set currentFullClean) { - if (tieredCacheService.getDiskCachingTier().isEmpty()) { - logger.debug("Skipping disk cache keys cleanup since no disk cache is configured"); - return; - } - final double cleanupKeysThresholdPercentage = 50.0; // TODO make this an index setting - int totalKeysInDiskCache = tieredCacheService.getDiskCachingTier().get().count(); - int totalKeysToCleanup = currentKeysToClean.size() + currentFullClean.size(); - - double cleanupKeysPercentage = ((double) totalKeysToCleanup / totalKeysInDiskCache) * 100; - if (cleanupKeysPercentage < cleanupKeysThresholdPercentage) { - logger.debug("Skipping disk cache keys cleanup since the keys to cleanup of {}% is not greater than " + - "the threshold percentage of {}%", cleanupKeysPercentage, cleanupKeysThresholdPercentage); - return; + synchronized double diskCleanupKeysPercentage() { + int totalKeysInDiskCache = tieredCacheService.getDiskCachingTier() + .map(CachingTier::count) + .orElse(0); + if (totalKeysInDiskCache == 0 || keysToClean.isEmpty()) { + return 0; } + return ((double) keysToClean.size() / totalKeysInDiskCache) * 100; + } - Iterator iterator = tieredCacheService.getDiskCachingTier().get().keys().iterator(); + synchronized void cleanUpKeys(Iterator iterator, Set currentKeysToClean, Set currentFullClean) { while (iterator.hasNext()) { Key key = iterator.next(); + CleanupKey cleanupKey = new CleanupKey(key.entity, key.readerCacheKeyId); if (currentFullClean.contains(key.entity.getCacheIdentity())) { iterator.remove(); currentFullClean.remove(key.entity.getCacheIdentity()); - } else { - CleanupKey cleanupKey = new CleanupKey(key.entity, key.readerCacheKeyId); - if (currentKeysToClean.contains(cleanupKey)) { - iterator.remove(); - currentKeysToClean.remove(cleanupKey); - } + keysToClean.remove(cleanupKey); // since a key could be either in onHeap or disk cache. + } else if (currentKeysToClean.contains(cleanupKey)) { + iterator.remove(); + currentKeysToClean.remove(cleanupKey); + keysToClean.remove(cleanupKey); } } } - // keeping this unsynchronized since we don't expect it to be called only by cleanCache which is synchronized - private void cleanOnHeapCache(Set currentKeysToClean, Set currentFullClean) { - Iterator iterator = tieredCacheService.getOnHeapCachingTier().keys().iterator(); + private void categorizeKeysForCleanup(Set currentKeysToClean, Set currentFullClean) { + Iterator iterator = keysToClean.iterator(); while (iterator.hasNext()) { - Key key = iterator.next(); - if (currentFullClean.contains(key.entity.getCacheIdentity())) { - iterator.remove(); - currentFullClean.remove(key.entity.getCacheIdentity()); + CleanupKey cleanupKey = iterator.next(); + if (needsFullClean(cleanupKey)) { + currentFullClean.add(cleanupKey.entity.getCacheIdentity()); } else { - CleanupKey cleanupKey = new CleanupKey(key.entity, key.readerCacheKeyId); - if (currentKeysToClean.contains(cleanupKey)) { - iterator.remove(); - currentKeysToClean.remove(cleanupKey); - } + currentKeysToClean.add(cleanupKey); } } - tieredCacheService.getOnHeapCachingTier().refresh(); } private boolean needsFullClean(CleanupKey cleanupKey) { @@ -520,4 +538,16 @@ long count() { int numRegisteredCloseListeners() { // for testing return registeredClosedListeners.size(); } + + void addCleanupKeyForTesting(CacheEntity entity, String readerCacheKeyId) { // for testing + keysToClean.add(new CleanupKey(entity, readerCacheKeyId)); + } + + int getKeysToCleanSizeForTesting() { // for testing + return keysToClean.size(); + } + + Key createKeyForTesting(CacheEntity entity, String readerCacheKeyId) { // for testing + return new Key(entity, null, readerCacheKeyId); + } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index f513f067bd9be..497458fd5927e 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -61,7 +61,6 @@ import org.opensearch.common.CheckedFunction; import org.opensearch.common.CheckedSupplier; import org.opensearch.common.Nullable; -import org.opensearch.common.cache.Cache; import org.opensearch.common.cache.tier.CachePolicyInfoWrapper; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lease.Releasable; @@ -216,11 +215,24 @@ public class IndicesService extends AbstractLifecycleComponent private static final Logger logger = LogManager.getLogger(IndicesService.class); public static final String INDICES_SHARDS_CLOSED_TIMEOUT = "indices.shards_closed_timeout"; + public static final String INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING_KEY = "cluster.request_cache.disk.cleanup_threshold_percentage"; + public static final String INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING_DEFAULT_VALUE = "50%"; + + public static final String INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING_KEY = "indices.request_cache.disk.cleanup_interval"; + + public static final Setting INDICES_CACHE_CLEAN_INTERVAL_SETTING = Setting.positiveTimeSetting( "indices.cache.cleanup_interval", TimeValue.timeValueMinutes(1), Property.NodeScope ); + + public static final Setting INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING = Setting.positiveTimeSetting( + INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING_KEY, + TimeValue.timeValueMinutes(1), + Property.NodeScope + ); + public static final Setting INDICES_ID_FIELD_DATA_ENABLED_SETTING = Setting.boolSetting( "indices.id_field_data.enabled", true, @@ -228,6 +240,31 @@ public class IndicesService extends AbstractLifecycleComponent Property.NodeScope ); + public static final Setting INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING = + Setting.simpleString( + INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING_KEY, + INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING_DEFAULT_VALUE, + value -> { + String errorLogBase = "The value of the setting " + + INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING_KEY + + " must be "; + if (!value.endsWith("%")) { + throw new IllegalArgumentException(errorLogBase + "a percentage"); + } + String rawValue = value.substring(0, value.length() - 1); + try { + double doubleValue = Double.parseDouble(rawValue); + if (doubleValue < 0 || doubleValue > 100) { + throw new IllegalArgumentException(errorLogBase + "between 0% and 100%"); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException(errorLogBase + "a valid percentage", e); + } + }, + Property.Dynamic, + Property.NodeScope + ); + public static final Setting WRITE_DANGLING_INDICES_INFO_SETTING = Setting.boolSetting( "gateway.write_dangling_indices_info", true, @@ -325,6 +362,7 @@ public class IndicesService extends AbstractLifecycleComponent private final IndexScopedSettings indexScopedSettings; private final IndicesFieldDataCache indicesFieldDataCache; private final CacheCleaner cacheCleaner; + private final DiskCacheCleaner diskCacheCleaner; private final ThreadPool threadPool; private final CircuitBreakerService circuitBreakerService; private final BigArrays bigArrays; @@ -338,7 +376,9 @@ public class IndicesService extends AbstractLifecycleComponent private final MapperRegistry mapperRegistry; private final NamedWriteableRegistry namedWriteableRegistry; private final IndexingMemoryController indexingMemoryController; - private final TimeValue cleanInterval; + private final TimeValue onHeapCachesCleanInterval; + private final TimeValue diskCachesCleanInterval; + private final double diskCachesCleanThreshold; final IndicesRequestCache indicesRequestCache; // pkg-private for testing private final IndicesQueryCache indicesQueryCache; private final MetaStateService metaStateService; @@ -366,8 +406,9 @@ public class IndicesService extends AbstractLifecycleComponent @Override protected void doStart() { - // Start thread that will manage cleaning the field data cache periodically - threadPool.schedule(this.cacheCleaner, this.cleanInterval, ThreadPool.Names.SAME); + // Start threads that will manage cleaning the field data and request caches periodically + threadPool.schedule(this.cacheCleaner, this.onHeapCachesCleanInterval, ThreadPool.Names.SAME); + threadPool.schedule(this.diskCacheCleaner, this.diskCachesCleanInterval, ThreadPool.Names.SAME); } public IndicesService( @@ -435,8 +476,11 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon circuitBreakerService.getBreaker(CircuitBreaker.FIELDDATA).addWithoutBreaking(-sizeInBytes); } }); - this.cleanInterval = INDICES_CACHE_CLEAN_INTERVAL_SETTING.get(settings); - this.cacheCleaner = new CacheCleaner(indicesFieldDataCache, indicesRequestCache, logger, threadPool, this.cleanInterval); + this.onHeapCachesCleanInterval = INDICES_CACHE_CLEAN_INTERVAL_SETTING.get(settings); + this.diskCachesCleanInterval = INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING.get(settings); + this.diskCachesCleanThreshold = getCleanupKeysThresholdPercentage(settings);; + this.cacheCleaner = new CacheCleaner(indicesFieldDataCache, indicesRequestCache, logger, threadPool, this.onHeapCachesCleanInterval); + this.diskCacheCleaner = new DiskCacheCleaner(indicesRequestCache, logger, threadPool, this.diskCachesCleanInterval, this.diskCachesCleanThreshold); this.metaStateService = metaStateService; this.engineFactoryProviders = engineFactoryProviders; @@ -496,6 +540,12 @@ protected void closeInternal() { this.recoverySettings = recoverySettings; } + private double getCleanupKeysThresholdPercentage(Settings settings) { + String threshold = INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING.get(settings); + String rawValue = threshold.substring(0, threshold.length() - 1); + return Double.parseDouble(rawValue) / 100; + } + /** * The changes to dynamic cluster setting {@code cluster.default.index.refresh_interval} needs to be updated. This * method gets called whenever the setting changes. We set the instance variable with the updated value as this is @@ -1564,21 +1614,37 @@ public AnalysisRegistry getAnalysis() { return analysisRegistry; } + private static abstract class AbstractCacheCleaner implements Runnable, Releasable { + + protected final Logger logger; + protected final ThreadPool threadPool; + protected final TimeValue interval; + protected final AtomicBoolean closed = new AtomicBoolean(false); + + AbstractCacheCleaner( + Logger logger, + ThreadPool threadPool, + TimeValue interval + ) { + this.logger = logger; + this.threadPool = threadPool; + this.interval = interval; + } + + @Override + public void close() { + closed.compareAndSet(false, true); + } + } + /** - * FieldDataCacheCleaner is a scheduled Runnable used to clean a Guava cache - * periodically. In this case it is the field data cache, because a cache that - * has an entry invalidated may not clean up the entry if it is not read from - * or written to after invalidation. + * CacheCleaner is a scheduled Runnable used to clean Field Data Caches and/or request caches periodically. * * @opensearch.internal */ - private static final class CacheCleaner implements Runnable, Releasable { + private static final class CacheCleaner extends AbstractCacheCleaner implements Runnable, Releasable { private final IndicesFieldDataCache cache; - private final Logger logger; - private final ThreadPool threadPool; - private final TimeValue interval; - private final AtomicBoolean closed = new AtomicBoolean(false); private final IndicesRequestCache requestCache; CacheCleaner( @@ -1588,11 +1654,9 @@ private static final class CacheCleaner implements Runnable, Releasable { ThreadPool threadPool, TimeValue interval ) { + super(logger, threadPool, interval); this.cache = cache; this.requestCache = requestCache; - this.logger = logger; - this.threadPool = threadPool; - this.interval = interval; } @Override @@ -1620,13 +1684,43 @@ public void run() { } // Reschedule itself to run again if not closed if (closed.get() == false) { - threadPool.scheduleUnlessShuttingDown(interval, ThreadPool.Names.SAME, this); + this.threadPool.scheduleUnlessShuttingDown(interval, ThreadPool.Names.SAME, this); } } + } + + private static final class DiskCacheCleaner extends AbstractCacheCleaner implements Runnable, Releasable { + + private final IndicesRequestCache requestCache; + private final double diskCachesCleanThresholdPercent; + + DiskCacheCleaner( + IndicesRequestCache requestCache, + Logger logger, + ThreadPool threadPool, + TimeValue interval, + double diskCachesCleanThresholdPercent + ) { + super(logger, threadPool, interval); + this.diskCachesCleanThresholdPercent = diskCachesCleanThresholdPercent; + this.requestCache = requestCache; + } @Override - public void close() { - closed.compareAndSet(false, true); + public void run() { + long startTimeNS = System.nanoTime(); + if (logger.isTraceEnabled()) { + logger.trace("running periodic disk based request cache cleanup"); + } + try { + this.requestCache.cleanDiskCache(diskCachesCleanThresholdPercent); + } catch (Exception e) { + logger.warn("Exception during periodic request cache cleanup:", e); + } + // Reschedule itself to run again if not closed + if (closed.get() == false) { + threadPool.scheduleUnlessShuttingDown(interval, ThreadPool.Names.SAME, this); + } } } diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index aee54dcf3379c..b323cccc16290 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -52,6 +52,13 @@ import org.opensearch.action.search.SearchRequest; import org.opensearch.common.CheckedSupplier; import org.opensearch.common.UUIDs; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.opensearch.common.CheckedSupplier; +import org.opensearch.common.cache.tier.DiskCachingTier; +import org.opensearch.common.cache.tier.TieredCacheService; +import org.opensearch.common.cache.tier.TieredCacheSpilloverStrategyService; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.lucene.search.TopDocsAndMaxScore; @@ -78,13 +85,23 @@ import org.opensearch.search.internal.ShardSearchContextId; import org.opensearch.search.internal.ShardSearchRequest; import org.opensearch.search.query.QuerySearchResult; +import org.opensearch.indices.IndicesRequestCache.CacheEntity; +import org.opensearch.indices.IndicesRequestCache.Key; import org.opensearch.test.OpenSearchSingleNodeTestCase; import java.io.IOException; import java.util.Arrays; +import java.util.Iterator; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.internal.verification.VerificationModeFactory.atLeastOnce; + public class IndicesRequestCacheTests extends OpenSearchSingleNodeTestCase { public void testBasicOperationsCache() throws Exception { @@ -291,7 +308,7 @@ public void testEviction() throws Exception { writer.updateDocument(new Term("id", "0"), newDoc(0, "baz")); DirectoryReader thirdReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TestEntity thirddEntity = new TestEntity(requestCacheStats, indexShard); + TestEntity thirdEntity = new TestEntity(requestCacheStats, indexShard); Loader thirdLoader = new Loader(thirdReader, 0); BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes); @@ -299,7 +316,7 @@ public void testEviction() throws Exception { BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes); assertEquals("bar", value2.streamInput().readString()); logger.info("Memory size: {}", requestCacheStats.stats().getMemorySize()); - BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes); + BytesReference value3 = cache.getOrCompute(thirdEntity, thirdLoader, thirdReader, termBytes); assertEquals("baz", value3.streamInput().readString()); assertEquals(2, cache.count()); assertEquals(1, requestCacheStats.stats().getEvictions()); @@ -330,7 +347,7 @@ public void testClearAllEntityIdentity() throws Exception { writer.updateDocument(new Term("id", "0"), newDoc(0, "baz")); DirectoryReader thirdReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); AtomicBoolean differentIdentity = new AtomicBoolean(true); - TestEntity thirddEntity = new TestEntity(requestCacheStats, differentIdentity); + TestEntity thirdEntity = new TestEntity(requestCacheStats, differentIdentity); Loader thirdLoader = new Loader(thirdReader, 0); BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes); @@ -338,16 +355,16 @@ public void testClearAllEntityIdentity() throws Exception { BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes); assertEquals("bar", value2.streamInput().readString()); logger.info("Memory size: {}", requestCacheStats.stats().getMemorySize()); - BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes); + BytesReference value3 = cache.getOrCompute(thirdEntity, thirdLoader, thirdReader, termBytes); assertEquals("baz", value3.streamInput().readString()); assertEquals(3, cache.count()); final long hitCount = requestCacheStats.stats().getHitCount(); - // clear all for the indexShard Idendity even though is't still open + // clear all for the indexShard Identity even though it isn't still open cache.clear(randomFrom(entity, secondEntity)); cache.cleanCache(); assertEquals(1, cache.count()); // third has not been validated since it's a different identity - value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes); + value3 = cache.getOrCompute(thirdEntity, thirdLoader, thirdReader, termBytes); assertEquals(hitCount + 1, requestCacheStats.stats().getHitCount()); assertEquals("baz", value3.streamInput().readString()); @@ -357,8 +374,8 @@ public void testClearAllEntityIdentity() throws Exception { public Iterable newDoc(int id, String value) { return Arrays.asList( - newField("id", Integer.toString(id), StringField.TYPE_STORED), - newField("value", value, StringField.TYPE_STORED) + newField("id", Integer.toString(id), StringField.TYPE_STORED), + newField("value", value, StringField.TYPE_STORED) ); } @@ -631,6 +648,141 @@ public long ramBytesUsed() { } @Override - public void writeTo(StreamOutput out) throws IOException {} + public void writeTo(StreamOutput out) throws IOException { + } + } + + public static class CleanDiskCacheTests { + private IndicesRequestCache indicesRequestCache; + private TieredCacheService tieredCacheService; + private DiskCachingTier diskCachingTier; + private IndicesService indicesService; + + @Before + public void setup() { + tieredCacheService = mock(TieredCacheService.class); + diskCachingTier = mock(DiskCachingTier.class); + indicesService = mock(IndicesService.class); + indicesRequestCache = new IndicesRequestCache(Settings.EMPTY, indicesService, tieredCacheService); + } + + @Test + public void shouldNotCleanDiskCacheWhenEmpty() { + final int DISK_CACHE_COUNT = 0; + final double CLEANUP_THRESHOLD = 50.0; + + when(tieredCacheService.getDiskCachingTier()).thenReturn( + (Optional>) Optional.of(diskCachingTier) + ); + when(diskCachingTier.count()).thenReturn(DISK_CACHE_COUNT); + + indicesRequestCache.cleanDiskCache(CLEANUP_THRESHOLD); + + verify(diskCachingTier, never()).keys(); + } + + @Test + public void shouldNotCleanDiskCacheWhenCleanupKeysPercentageIsBelowThreshold() { + final int DISK_CACHE_COUNT = 1; + final double CLEANUP_THRESHOLD = 49.0; + + when(tieredCacheService.getDiskCachingTier()).thenReturn( + (Optional>) Optional.of(diskCachingTier) + ); + when(diskCachingTier.count()).thenReturn(DISK_CACHE_COUNT); + + indicesRequestCache.cleanDiskCache(CLEANUP_THRESHOLD); + + verify(diskCachingTier, never()).keys(); + } + + @Test + public void cleanDiskCacheWhenCleanupKeysPercentageIsGreaterThanOrEqualToThreshold() { + final int DISK_CACHE_COUNT = 100; + final double CLEANUP_THRESHOLD = 50.0; + + // Mock dependencies + IndicesService mockIndicesService = mock(IndicesService.class); + TieredCacheService mockTieredCacheService = mock(TieredCacheService.class); + DiskCachingTier mockDiskCachingTier = mock(DiskCachingTier.class); + Iterator mockIterator = mock(Iterator.class); + Iterable mockIterable = () -> mockIterator; + + IndicesRequestCache cache = new IndicesRequestCache( + Settings.EMPTY, + mockIndicesService, + mockTieredCacheService + ); + + // Set up mocks + when(mockTieredCacheService.getDiskCachingTier()).thenReturn(Optional.of(mockDiskCachingTier)); + when(mockDiskCachingTier.count()).thenReturn(DISK_CACHE_COUNT); + when(mockDiskCachingTier.keys()).thenReturn(mockIterable); + when(mockIterator.hasNext()).thenReturn(true, true, false); + + // Create mock Keys and return them when next() is called + CacheEntity mockEntity = mock(CacheEntity.class); + Key firstMockKey = cache.createKeyForTesting(mockEntity, "readerCacheKeyId1"); + Key secondMockKey = cache.createKeyForTesting(mockEntity, "readerCacheKeyId2"); + when(mockEntity.getCacheIdentity()).thenReturn(new Object()); + when(mockIterator.next()).thenReturn(firstMockKey, secondMockKey); + + cache.addCleanupKeyForTesting(mockEntity, "readerCacheKeyId"); + cache.cleanDiskCache(CLEANUP_THRESHOLD); + + // Verify interactions + verify(mockDiskCachingTier).keys(); + verify(mockIterator, atLeastOnce()).remove(); + } + + @Test + public void diskCleanupKeysPercentageWhenDiskCacheIsEmpty() { + when(tieredCacheService.getDiskCachingTier()).thenReturn(Optional.of(diskCachingTier)); + when(diskCachingTier.count()).thenReturn(0); + + double result = indicesRequestCache.diskCleanupKeysPercentage(); + + assertEquals(0, result, 0); + } + + @Test + public void diskCleanupKeysPercentageWhenKeysToCleanIsEmpty() { + IndicesService mockIndicesService = mock(IndicesService.class); + TieredCacheService mockTieredCacheService = mock(TieredCacheSpilloverStrategyService.class); + DiskCachingTier mockDiskCachingTier = mock(DiskCachingTier.class); + when(mockTieredCacheService.getDiskCachingTier()).thenReturn(Optional.of(mockDiskCachingTier)); + when(mockDiskCachingTier.count()).thenReturn(100); + IndicesRequestCache cache = new IndicesRequestCache( + Settings.EMPTY, + mockIndicesService, + mockTieredCacheService + ); + + double result = cache.diskCleanupKeysPercentage(); + + assertEquals(0, result, 0.001); + } + + @Test + public void diskCleanupKeysPercentageWhenDiskCacheAndKeysToCleanAreNotEmpty() { + IndicesService mockIndicesService = mock(IndicesService.class); + TieredCacheService mockTieredCacheService = mock(TieredCacheSpilloverStrategyService.class); + DiskCachingTier mockDiskCachingTier = mock(DiskCachingTier.class); + when(mockTieredCacheService.getDiskCachingTier()).thenReturn(Optional.of(mockDiskCachingTier)); + when(mockDiskCachingTier.count()).thenReturn(100); + + IndicesRequestCache cache = new IndicesRequestCache( + Settings.EMPTY, + mockIndicesService, + mockTieredCacheService + ); + + IndicesRequestCache.CacheEntity mockEntity = Mockito.mock(IndicesRequestCache.CacheEntity.class); + when(mockEntity.getCacheIdentity()).thenReturn(new Object()); + cache.addCleanupKeyForTesting(mockEntity, "readerCacheKeyId"); + + double result = cache.diskCleanupKeysPercentage(); + assertEquals(1.0, result, 0); + } } } From 4500090f5adbd834bb62cbd8e6df200c87b12a8d Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Tue, 28 Nov 2023 21:46:54 -0800 Subject: [PATCH 03/21] register INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING in clustersettings.java --- .../java/org/opensearch/common/settings/ClusterSettings.java | 3 ++- .../src/main/java/org/opensearch/indices/IndicesService.java | 5 ++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 5e3f4e959648e..0b0dfcc63c422 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -688,11 +688,12 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING, IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING, + IndicesService.CLUSTER_RESTRICT_INDEX_REPLICATION_TYPE_SETTING, + IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING, AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE, CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, CPUBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT, CPUBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT, - IndicesService.CLUSTER_RESTRICT_INDEX_REPLICATION_TYPE_SETTING, DiskTierTookTimePolicy.DISK_TOOKTIME_THRESHOLD_SETTING ) ) diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 497458fd5927e..c92b9170bec35 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -242,11 +242,10 @@ public class IndicesService extends AbstractLifecycleComponent public static final Setting INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING = Setting.simpleString( - INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING_KEY, + "cluster.request_cache.disk.cleanup_threshold_percentage", INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING_DEFAULT_VALUE, value -> { - String errorLogBase = "The value of the setting " + - INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING_KEY + + String errorLogBase = "Setting 'cluster.request_cache.disk.cleanup_threshold_percentage' " + " must be "; if (!value.endsWith("%")) { throw new IllegalArgumentException(errorLogBase + "a percentage"); From 4199bc2726235456e5b5422eaf4e836f25c2c5ed Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Tue, 28 Nov 2023 21:48:34 -0800 Subject: [PATCH 04/21] make INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING dynamic property --- server/src/main/java/org/opensearch/indices/IndicesService.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index c92b9170bec35..253238c96e642 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -230,6 +230,7 @@ public class IndicesService extends AbstractLifecycleComponent public static final Setting INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING = Setting.positiveTimeSetting( INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING_KEY, TimeValue.timeValueMinutes(1), + Property.Dynamic, Property.NodeScope ); From 6e05b9dce6a355ee8e571da34ccaf3ee6109577a Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Tue, 28 Nov 2023 21:49:44 -0800 Subject: [PATCH 05/21] minor changes --- .../indices/IndicesRequestCache.java | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index fb7662a57a6de..9fb015e5ca695 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -452,7 +452,7 @@ synchronized void cleanCache() { } cleanUpKeys( - tieredCacheService.getOnHeapCachingTier().keys().iterator(), + tieredCacheService.getOnHeapCachingTier(), currentKeysToClean, currentFullClean ); @@ -481,7 +481,10 @@ synchronized void cleanDiskCache(double diskCachesCleanThresholdPercent) { if (currentKeysToClean.isEmpty() && currentFullClean.isEmpty()) { return; } - cleanUpKeys(diskCachingTier.keys().iterator(), currentKeysToClean, currentFullClean); + cleanUpKeys( + tieredCacheService.getOnHeapCachingTier(), + currentKeysToClean, + currentFullClean); }); } @@ -495,16 +498,21 @@ synchronized double diskCleanupKeysPercentage() { return ((double) keysToClean.size() / totalKeysInDiskCache) * 100; } - synchronized void cleanUpKeys(Iterator iterator, Set currentKeysToClean, Set currentFullClean) { + synchronized void cleanUpKeys( + CachingTier cachingTier, + Set currentKeysToClean, + Set currentFullClean + ) { + Iterator iterator = cachingTier.keys().iterator(); while (iterator.hasNext()) { Key key = iterator.next(); CleanupKey cleanupKey = new CleanupKey(key.entity, key.readerCacheKeyId); if (currentFullClean.contains(key.entity.getCacheIdentity())) { - iterator.remove(); + cachingTier.invalidate(key); currentFullClean.remove(key.entity.getCacheIdentity()); keysToClean.remove(cleanupKey); // since a key could be either in onHeap or disk cache. } else if (currentKeysToClean.contains(cleanupKey)) { - iterator.remove(); + cachingTier.invalidate(key); currentKeysToClean.remove(cleanupKey); keysToClean.remove(cleanupKey); } From aa3c39c2bac0d260070d05fdf74ffaed26539f3b Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Tue, 28 Nov 2023 22:26:21 -0800 Subject: [PATCH 06/21] change the way we calculate disk key staleness --- .../indices/IndicesRequestCache.java | 89 ++++++++++++++++++- 1 file changed, 85 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 9fb015e5ca695..2f8d4fe867a59 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -65,6 +65,8 @@ import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.shard.IndexShard; import java.io.Closeable; import java.io.IOException; @@ -75,6 +77,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; /** @@ -119,6 +122,9 @@ public final class IndicesRequestCache implements TieredCacheEventListener registeredClosedListeners = ConcurrentCollections.newConcurrentMap(); private final Set keysToClean = ConcurrentCollections.newConcurrentSet(); + // A map to keep track of the number of keys to be cleaned for a given ShardId and readerCacheKeyId + private final ConcurrentMap> diskCleanupKeyToCountMap = ConcurrentCollections.newConcurrentMap(); + private final AtomicInteger staleKeysInDiskCount = new AtomicInteger(0); private final ByteSizeValue size; private final TimeValue expire; private final TieredCacheService tieredCacheService; @@ -196,7 +202,9 @@ public void close() { } void clear(CacheEntity entity) { - keysToClean.add(new CleanupKey(entity, null)); + CleanupKey cleanupKey = new CleanupKey(entity, null); + keysToClean.add(cleanupKey); + updateStaleKeysInDiskCount(cleanupKey); cleanCache(); } @@ -218,8 +226,71 @@ public void onHit(Key key, CacheValue cacheValue) { @Override public void onCached(Key key, BytesReference value, TierType tierType) { key.entity.onCached(key, value, tierType); + updateDiskCleanupKeyToCountMap(new CleanupKey(key.entity, key.readerCacheKeyId), tierType); } + /** + * Updates the diskCleanupKeyToCountMap with the given CleanupKey and TierType. + * If the TierType is not DISK, the method returns without making any changes. + * If the ShardId associated with the CleanupKey does not exist in the map, a new entry is created. + * The method increments the count of the CleanupKey in the map. + * + * Why use ShardID as the key ? + * CacheEntity mainly contains IndexShard, both of these classes do not override equals() and hashCode() methods. + * ShardID class properly overrides equals() and hashCode() methods. + * Therefore, to avoid modifying CacheEntity and IndexShard classes to override these methods, we use ShardID as the key. + * + * @param cleanupKey the CleanupKey to be updated in the map + * @param tierType the TierType of the CleanupKey + */ + private void updateDiskCleanupKeyToCountMap(CleanupKey cleanupKey, TierType tierType) { + if(!tierType.equals(TierType.DISK)) { + return; + } + IndexShard indexShard = (IndexShard)cleanupKey.entity.getCacheIdentity(); + ShardId shardId = indexShard.shardId(); + + diskCleanupKeyToCountMap + .computeIfAbsent(shardId, k -> ConcurrentCollections.newConcurrentMap()) + .merge(cleanupKey.readerCacheKeyId, 1, Integer::sum); + } + + /** + * Updates the count of stale keys in the disk cache. + * This method is called when a CleanupKey is added to the keysToClean set. + * It increments the staleKeysInDiskCount by the count of the CleanupKey in the diskCleanupKeyToCountMap. + * If the CleanupKey's readerCacheKeyId is null or the CleanupKey's entity is not open, it increments the staleKeysInDiskCount + * by the total count of keys associated with the CleanupKey's ShardId in the diskCleanupKeyToCountMap and removes the ShardId from the map. + * + * @param cleanupKey the CleanupKey that has been marked for cleanup + */ + private void updateStaleKeysInDiskCount(CleanupKey cleanupKey) { + IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity(); + ShardId shardId = indexShard.shardId(); + + ConcurrentMap countMap = diskCleanupKeyToCountMap.get(shardId); + if (countMap == null) { + return; + } + + if (cleanupKey.readerCacheKeyId == null) { + int totalSum = countMap.values().stream().mapToInt(Integer::intValue).sum(); + staleKeysInDiskCount.addAndGet(totalSum); + diskCleanupKeyToCountMap.remove(shardId); + return; + } + Integer count = countMap.get(cleanupKey.readerCacheKeyId); + if (count == null) { + return; + } + staleKeysInDiskCount.addAndGet(count); + countMap.remove(cleanupKey.readerCacheKeyId); + if (countMap.isEmpty()) { + diskCleanupKeyToCountMap.remove(shardId); + } + } + + BytesReference getOrCompute( CacheEntity cacheEntity, CheckedSupplier loader, @@ -414,6 +485,7 @@ public void onClose(IndexReader.CacheKey cacheKey) { Boolean remove = registeredClosedListeners.remove(this); if (remove != null) { keysToClean.add(this); + updateStaleKeysInDiskCount(new CleanupKey(this.entity, this.readerCacheKeyId)); } } @@ -482,7 +554,7 @@ synchronized void cleanDiskCache(double diskCachesCleanThresholdPercent) { return; } cleanUpKeys( - tieredCacheService.getOnHeapCachingTier(), + tieredCacheService.getDiskCachingTier().get(), currentKeysToClean, currentFullClean); }); @@ -492,10 +564,10 @@ synchronized double diskCleanupKeysPercentage() { int totalKeysInDiskCache = tieredCacheService.getDiskCachingTier() .map(CachingTier::count) .orElse(0); - if (totalKeysInDiskCache == 0 || keysToClean.isEmpty()) { + if (totalKeysInDiskCache == 0 || staleKeysInDiskCount.get() == 0) { return 0; } - return ((double) keysToClean.size() / totalKeysInDiskCache) * 100; + return ((double) staleKeysInDiskCount.get() / totalKeysInDiskCache) * 100; } synchronized void cleanUpKeys( @@ -543,19 +615,28 @@ long count() { return tieredCacheService.count(); } + // to be used for testing only int numRegisteredCloseListeners() { // for testing return registeredClosedListeners.size(); } + // to be used for testing only void addCleanupKeyForTesting(CacheEntity entity, String readerCacheKeyId) { // for testing keysToClean.add(new CleanupKey(entity, readerCacheKeyId)); } + // to be used for testing only int getKeysToCleanSizeForTesting() { // for testing return keysToClean.size(); } + // to be used for testing only Key createKeyForTesting(CacheEntity entity, String readerCacheKeyId) { // for testing return new Key(entity, null, readerCacheKeyId); } + + // to be used for testing only + void setStaleKeysInDiskCountForTesting(int count) { + staleKeysInDiskCount.set(count); + } } From 83d70f74ceed288bd66f855c60712511bf5f454a Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Tue, 28 Nov 2023 22:27:57 -0800 Subject: [PATCH 07/21] fix breaking tests --- .../indices/IndicesRequestCacheTests.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index b323cccc16290..367b0d082f3d5 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -47,15 +47,14 @@ import org.apache.lucene.search.TotalHits; import org.apache.lucene.store.Directory; import org.apache.lucene.util.BytesRef; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; import org.opensearch.action.OriginalIndices; import org.opensearch.action.OriginalIndicesTests; import org.opensearch.action.search.SearchRequest; import org.opensearch.common.CheckedSupplier; import org.opensearch.common.UUIDs; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; -import org.opensearch.common.CheckedSupplier; import org.opensearch.common.cache.tier.DiskCachingTier; import org.opensearch.common.cache.tier.TieredCacheService; import org.opensearch.common.cache.tier.TieredCacheSpilloverStrategyService; @@ -67,7 +66,6 @@ import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.common.Strings; import org.opensearch.core.common.bytes.AbstractBytesReference; -import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -79,14 +77,14 @@ import org.opensearch.index.cache.request.ShardRequestCache; import org.opensearch.index.query.TermQueryBuilder; import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.IndicesRequestCache.CacheEntity; +import org.opensearch.indices.IndicesRequestCache.Key; import org.opensearch.search.DocValueFormat; import org.opensearch.search.SearchShardTarget; import org.opensearch.search.internal.AliasFilter; import org.opensearch.search.internal.ShardSearchContextId; import org.opensearch.search.internal.ShardSearchRequest; import org.opensearch.search.query.QuerySearchResult; -import org.opensearch.indices.IndicesRequestCache.CacheEntity; -import org.opensearch.indices.IndicesRequestCache.Key; import org.opensearch.test.OpenSearchSingleNodeTestCase; import java.io.IOException; @@ -100,7 +98,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.mockito.internal.verification.VerificationModeFactory.atLeastOnce; +import static org.mockito.internal.verification.VerificationModeFactory.times; public class IndicesRequestCacheTests extends OpenSearchSingleNodeTestCase { @@ -700,6 +698,7 @@ public void shouldNotCleanDiskCacheWhenCleanupKeysPercentageIsBelowThreshold() { public void cleanDiskCacheWhenCleanupKeysPercentageIsGreaterThanOrEqualToThreshold() { final int DISK_CACHE_COUNT = 100; final double CLEANUP_THRESHOLD = 50.0; + final int STALE_KEYS_IN_DISK_COUNT = 51; // Mock dependencies IndicesService mockIndicesService = mock(IndicesService.class); @@ -728,11 +727,12 @@ public void cleanDiskCacheWhenCleanupKeysPercentageIsGreaterThanOrEqualToThresho when(mockIterator.next()).thenReturn(firstMockKey, secondMockKey); cache.addCleanupKeyForTesting(mockEntity, "readerCacheKeyId"); + cache.setStaleKeysInDiskCountForTesting(STALE_KEYS_IN_DISK_COUNT); cache.cleanDiskCache(CLEANUP_THRESHOLD); // Verify interactions verify(mockDiskCachingTier).keys(); - verify(mockIterator, atLeastOnce()).remove(); + verify(mockIterator, times(2)).next(); } @Test @@ -780,6 +780,7 @@ public void diskCleanupKeysPercentageWhenDiskCacheAndKeysToCleanAreNotEmpty() { IndicesRequestCache.CacheEntity mockEntity = Mockito.mock(IndicesRequestCache.CacheEntity.class); when(mockEntity.getCacheIdentity()).thenReturn(new Object()); cache.addCleanupKeyForTesting(mockEntity, "readerCacheKeyId"); + cache.setStaleKeysInDiskCountForTesting(1); double result = cache.diskCleanupKeysPercentage(); assertEquals(1.0, result, 0); From caae29191a14d97d5d83f4cee041fff8788fd172 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Thu, 30 Nov 2023 13:24:49 -0800 Subject: [PATCH 08/21] Fix test to include IndexShard --- .../indices/IndicesRequestCacheTests.java | 44 ++++++++++++------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 367b0d082f3d5..d5987956b73b1 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -92,7 +92,6 @@ import java.util.Iterator; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -101,6 +100,12 @@ import static org.mockito.internal.verification.VerificationModeFactory.times; public class IndicesRequestCacheTests extends OpenSearchSingleNodeTestCase { + public IndexShard getIndexShardCache() { + IndexShard indexShard = mock(IndexShard.class); + ShardId shardId = mock(ShardId.class); + when(indexShard.shardId()).thenReturn(shardId); + return indexShard; + } public void testBasicOperationsCache() throws Exception { ShardRequestCache requestCacheStats = new ShardRequestCache(); @@ -113,7 +118,7 @@ public void testBasicOperationsCache() throws Exception { DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - AtomicBoolean indexShard = new AtomicBoolean(true); + IndexShard indexShard = getIndexShardCache(); // initial cache TestEntity entity = new TestEntity(requestCacheStats, indexShard); @@ -143,7 +148,7 @@ public void testBasicOperationsCache() throws Exception { if (randomBoolean()) { reader.close(); } else { - indexShard.set(false); // closed shard but reader is still open + entity.setIsOpen(false); cache.clear(entity); } cache.cleanCache(); @@ -161,7 +166,7 @@ public void testBasicOperationsCache() throws Exception { public void testCacheDifferentReaders() throws Exception { ClusterSettings dummyClusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class), dummyClusterSettings); - AtomicBoolean indexShard = new AtomicBoolean(true); + IndexShard indexShard = getIndexShardCache(); ShardRequestCache requestCacheStats = new ShardRequestCache(); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -239,7 +244,7 @@ public void testCacheDifferentReaders() throws Exception { if (randomBoolean()) { secondReader.close(); } else { - indexShard.set(false); // closed shard but reader is still open + entity.setIsOpen(false); cache.clear(secondEntity); } cache.cleanCache(); @@ -258,7 +263,7 @@ public void testEviction() throws Exception { ClusterSettings dummyClusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); { IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class), dummyClusterSettings); - AtomicBoolean indexShard = new AtomicBoolean(true); + IndexShard indexShard = getIndexShardCache(); ShardRequestCache requestCacheStats = new ShardRequestCache(); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -287,7 +292,7 @@ public void testEviction() throws Exception { getInstanceFromNode(IndicesService.class), dummyClusterSettings ); - AtomicBoolean indexShard = new AtomicBoolean(true); + IndexShard indexShard = getIndexShardCache(); ShardRequestCache requestCacheStats = new ShardRequestCache(); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -316,7 +321,7 @@ public void testEviction() throws Exception { logger.info("Memory size: {}", requestCacheStats.stats().getMemorySize()); BytesReference value3 = cache.getOrCompute(thirdEntity, thirdLoader, thirdReader, termBytes); assertEquals("baz", value3.streamInput().readString()); - assertEquals(2, cache.count()); + assertEquals(2, requestCacheStats.stats().getEntries()); assertEquals(1, requestCacheStats.stats().getEvictions()); IOUtils.close(reader, secondReader, thirdReader, writer, dir, cache); } @@ -324,7 +329,7 @@ public void testEviction() throws Exception { public void testClearAllEntityIdentity() throws Exception { ClusterSettings dummyClusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class), dummyClusterSettings); - AtomicBoolean indexShard = new AtomicBoolean(true); + IndexShard indexShard = getIndexShardCache(); ShardRequestCache requestCacheStats = new ShardRequestCache(); Directory dir = newDirectory(); @@ -344,7 +349,7 @@ public void testClearAllEntityIdentity() throws Exception { writer.updateDocument(new Term("id", "0"), newDoc(0, "baz")); DirectoryReader thirdReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - AtomicBoolean differentIdentity = new AtomicBoolean(true); + IndexShard differentIdentity = getIndexShardCache(); TestEntity thirdEntity = new TestEntity(requestCacheStats, differentIdentity); Loader thirdLoader = new Loader(thirdReader, 0); @@ -417,7 +422,7 @@ public void testInvalidate() throws Exception { DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - AtomicBoolean indexShard = new AtomicBoolean(true); + IndexShard indexShard = getIndexShardCache(); // initial cache TestEntity entity = new TestEntity(requestCacheStats, indexShard); @@ -461,7 +466,7 @@ public void testInvalidate() throws Exception { if (randomBoolean()) { reader.close(); } else { - indexShard.set(false); // closed shard but reader is still open + entity.setIsOpen(false); // closed shard but reader is still open cache.clear(entity); } cache.cleanCache(); @@ -476,8 +481,8 @@ public void testInvalidate() throws Exception { } public void testEqualsKey() throws IOException { - AtomicBoolean trueBoolean = new AtomicBoolean(true); - AtomicBoolean falseBoolean = new AtomicBoolean(false); + IndexShard trueBoolean = getIndexShardCache(); + IndexShard falseBoolean = getIndexShardCache(); IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndicesRequestCache indicesRequestCache = indicesService.indicesRequestCache; Directory dir = newDirectory(); @@ -617,10 +622,11 @@ public boolean isFragment() { } private class TestEntity extends AbstractIndexShardCacheEntity { - private final AtomicBoolean standInForIndexShard; + private final IndexShard standInForIndexShard; private final ShardRequestCache shardRequestCache; + private boolean isOpen = true; - private TestEntity(ShardRequestCache shardRequestCache, AtomicBoolean standInForIndexShard) { + private TestEntity(ShardRequestCache shardRequestCache, IndexShard standInForIndexShard) { this.standInForIndexShard = standInForIndexShard; this.shardRequestCache = shardRequestCache; } @@ -632,7 +638,11 @@ protected ShardRequestCache stats() { @Override public boolean isOpen() { - return standInForIndexShard.get(); + return this.isOpen; + } + + public void setIsOpen(boolean isOpen) { + this.isOpen = isOpen; } @Override From 9d0eab145f3354016d9541a3898d45ad6a19cbc1 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Thu, 30 Nov 2023 13:25:14 -0800 Subject: [PATCH 09/21] UT for testing invalidate of DiskTier is called --- .../indices/IndicesRequestCacheTests.java | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index d5987956b73b1..780a49a13fb34 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -56,6 +56,7 @@ import org.opensearch.common.CheckedSupplier; import org.opensearch.common.UUIDs; import org.opensearch.common.cache.tier.DiskCachingTier; +import org.opensearch.common.cache.tier.EhCacheDiskCachingTier; import org.opensearch.common.cache.tier.TieredCacheService; import org.opensearch.common.cache.tier.TieredCacheSpilloverStrategyService; import org.opensearch.common.io.stream.BytesStreamOutput; @@ -93,6 +94,7 @@ import java.util.Optional; import java.util.UUID; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -745,6 +747,48 @@ public void cleanDiskCacheWhenCleanupKeysPercentageIsGreaterThanOrEqualToThresho verify(mockIterator, times(2)).next(); } + @Test + public void cleanDiskCacheAndCallInvalidateOfDiskTier() { + final int DISK_CACHE_COUNT = 100; + final double CLEANUP_THRESHOLD = 50.0; + final int STALE_KEYS_IN_DISK_COUNT = 51; + + // Mock dependencies + IndicesService mockIndicesService = mock(IndicesService.class); + TieredCacheService mockTieredCacheService = mock(TieredCacheService.class); + DiskCachingTier mockDiskCachingTier = mock(EhCacheDiskCachingTier.class); + Iterator mockIterator = mock(Iterator.class); + Iterable mockIterable = () -> mockIterator; + + IndicesRequestCache cache = new IndicesRequestCache( + Settings.EMPTY, + mockIndicesService, + mockTieredCacheService + ); + + // Set up mocks + when(mockTieredCacheService.getDiskCachingTier()).thenReturn(Optional.of(mockDiskCachingTier)); + when(mockDiskCachingTier.count()).thenReturn(DISK_CACHE_COUNT); + when(mockDiskCachingTier.keys()).thenReturn(mockIterable); + when(mockIterator.hasNext()).thenReturn(true, true, false); + + // Create mock Keys and return them when next() is called + CacheEntity mockEntity = mock(CacheEntity.class); + Key firstMockKey = cache.createKeyForTesting(mockEntity, "readerCacheKeyId1"); + Key secondMockKey = cache.createKeyForTesting(mockEntity, "readerCacheKeyId2"); + when(mockEntity.getCacheIdentity()).thenReturn(new Object()); + when(mockIterator.next()).thenReturn(firstMockKey, secondMockKey); + + cache.addCleanupKeyForTesting(mockEntity, "readerCacheKeyId"); + cache.setStaleKeysInDiskCountForTesting(STALE_KEYS_IN_DISK_COUNT); + cache.cleanDiskCache(CLEANUP_THRESHOLD); + + // Verify interactions + verify(mockDiskCachingTier).keys(); + verify(mockDiskCachingTier, times(1)).invalidate(any(IndicesRequestCache.Key.class)); + verify(mockIterator, times(2)).next(); + } + @Test public void diskCleanupKeysPercentageWhenDiskCacheIsEmpty() { when(tieredCacheService.getDiskCachingTier()).thenReturn(Optional.of(diskCachingTier)); From 5c472aed03d5f9c43cbd119db917a5104d3d3bf3 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Tue, 5 Dec 2023 07:11:27 -0800 Subject: [PATCH 10/21] Introduce CleanupStatus to keysToClean --- .../opensearch/indices/IndicesRequestCache.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 2f8d4fe867a59..910365014117e 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -121,7 +121,7 @@ public final class IndicesRequestCache implements TieredCacheEventListener registeredClosedListeners = ConcurrentCollections.newConcurrentMap(); - private final Set keysToClean = ConcurrentCollections.newConcurrentSet(); + private final Map keysToClean = ConcurrentCollections.newConcurrentMap(); // A map to keep track of the number of keys to be cleaned for a given ShardId and readerCacheKeyId private final ConcurrentMap> diskCleanupKeyToCountMap = ConcurrentCollections.newConcurrentMap(); private final AtomicInteger staleKeysInDiskCount = new AtomicInteger(0); @@ -203,7 +203,7 @@ public void close() { void clear(CacheEntity entity) { CleanupKey cleanupKey = new CleanupKey(entity, null); - keysToClean.add(cleanupKey); + keysToClean.put(cleanupKey, new CleanupStatus()); updateStaleKeysInDiskCount(cleanupKey); cleanCache(); } @@ -370,6 +370,11 @@ public BytesReference load(Key key) throws Exception { } } + public class CleanupStatus { + public boolean cleanedInHeap; + public boolean cleanedOnDisk; + } + /** * Basic interface to make this cache testable. */ @@ -484,8 +489,8 @@ private CleanupKey(CacheEntity entity, String readerCacheKeyId) { public void onClose(IndexReader.CacheKey cacheKey) { Boolean remove = registeredClosedListeners.remove(this); if (remove != null) { - keysToClean.add(this); - updateStaleKeysInDiskCount(new CleanupKey(this.entity, this.readerCacheKeyId)); + keysToClean.put(this, new CleanupStatus()); + updateStaleKeysInDiskCount(this); } } @@ -622,7 +627,7 @@ int numRegisteredCloseListeners() { // for testing // to be used for testing only void addCleanupKeyForTesting(CacheEntity entity, String readerCacheKeyId) { // for testing - keysToClean.add(new CleanupKey(entity, readerCacheKeyId)); + keysToClean.put(new CleanupKey(entity, readerCacheKeyId), new CleanupStatus()); } // to be used for testing only From be10a165d1fe717d573a9af399a334d8ac941faa Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Tue, 5 Dec 2023 07:11:55 -0800 Subject: [PATCH 11/21] use that cleanupStatus and logic to update staleKeyEntries --- .../indices/IndicesRequestCache.java | 94 +++++++++++++------ 1 file changed, 66 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 910365014117e..c71f874c3e5bf 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -73,7 +73,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; -import java.util.Iterator; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentMap; @@ -206,6 +206,12 @@ void clear(CacheEntity entity) { keysToClean.put(cleanupKey, new CleanupStatus()); updateStaleKeysInDiskCount(cleanupKey); cleanCache(); + /* + this would be triggered by the cache clear API call + we need to make sure we clean the disk cache as well + hence passing threshold as 0 + */ + cleanDiskCache(0); } @Override @@ -290,7 +296,6 @@ private void updateStaleKeysInDiskCount(CleanupKey cleanupKey) { } } - BytesReference getOrCompute( CacheEntity cacheEntity, CheckedSupplier loader, @@ -520,8 +525,33 @@ public int hashCode() { synchronized void cleanCache() { final Set currentKeysToClean = new HashSet<>(); final Set currentFullClean = new HashSet<>(); + /* + Stores the keys that need to be removed from keysToClean + This is done to avoid ConcurrentModificationException + */ + final Set keysCleanedFromAllCaches = new HashSet<>(); + + for (Map.Entry entry : keysToClean.entrySet()) { + CleanupKey cleanupKey = entry.getKey(); + CleanupStatus cleanupStatus = entry.getValue(); + + if (cleanupStatus.cleanedInHeap && cleanupStatus.cleanedOnDisk) { + keysCleanedFromAllCaches.add(cleanupKey); + continue; + } + + if (cleanupStatus.cleanedInHeap) continue; + + if (needsFullClean(cleanupKey)) { + currentFullClean.add(cleanupKey.entity.getCacheIdentity()); + } else { + currentKeysToClean.add(cleanupKey); + } + cleanupStatus.cleanedInHeap = true; + } - categorizeKeysForCleanup(currentKeysToClean, currentFullClean); + // Remove keys that have been cleaned from all caches + keysToClean.keySet().removeAll(keysCleanedFromAllCaches); // Early exit if no cleanup is needed if (currentKeysToClean.isEmpty() && currentFullClean.isEmpty()) { @@ -549,10 +579,37 @@ synchronized void cleanDiskCache(double diskCachesCleanThresholdPercent) { return; } } - Set currentKeysToClean = new HashSet<>(); - Set currentFullClean = new HashSet<>(); + // Categorize keys to be cleaned into currentKeysToClean and currentFullClean + final Set currentKeysToClean = new HashSet<>(); + final Set currentFullClean = new HashSet<>(); + + /* + Stores the keys that need to be removed from keysToClean + This is done to avoid ConcurrentModificationException + */ + final Set keysCleanedFromAllCaches = new HashSet<>(); + + for (Map.Entry entry : keysToClean.entrySet()) { + CleanupKey cleanupKey = entry.getKey(); + CleanupStatus cleanupStatus = entry.getValue(); + + if (cleanupStatus.cleanedInHeap && cleanupStatus.cleanedOnDisk) { + keysCleanedFromAllCaches.add(cleanupKey); + continue; + } + + if (cleanupStatus.cleanedOnDisk) continue; + + if (needsFullClean(cleanupKey)) { + currentFullClean.add(cleanupKey.entity.getCacheIdentity()); + } else { + currentKeysToClean.add(cleanupKey); + } + cleanupStatus.cleanedOnDisk = true; + } - categorizeKeysForCleanup(currentKeysToClean, currentFullClean); + // Remove keys that have been cleaned from all caches + keysToClean.keySet().removeAll(keysCleanedFromAllCaches); // Early exit if no cleanup is needed if (currentKeysToClean.isEmpty() && currentFullClean.isEmpty()) { @@ -580,30 +637,11 @@ synchronized void cleanUpKeys( Set currentKeysToClean, Set currentFullClean ) { - Iterator iterator = cachingTier.keys().iterator(); - while (iterator.hasNext()) { - Key key = iterator.next(); + for (Key key : cachingTier.keys()) { CleanupKey cleanupKey = new CleanupKey(key.entity, key.readerCacheKeyId); - if (currentFullClean.contains(key.entity.getCacheIdentity())) { + if (currentFullClean.contains(key.entity.getCacheIdentity()) || currentKeysToClean.contains(cleanupKey)) { cachingTier.invalidate(key); - currentFullClean.remove(key.entity.getCacheIdentity()); - keysToClean.remove(cleanupKey); // since a key could be either in onHeap or disk cache. - } else if (currentKeysToClean.contains(cleanupKey)) { - cachingTier.invalidate(key); - currentKeysToClean.remove(cleanupKey); - keysToClean.remove(cleanupKey); - } - } - } - - private void categorizeKeysForCleanup(Set currentKeysToClean, Set currentFullClean) { - Iterator iterator = keysToClean.iterator(); - while (iterator.hasNext()) { - CleanupKey cleanupKey = iterator.next(); - if (needsFullClean(cleanupKey)) { - currentFullClean.add(cleanupKey.entity.getCacheIdentity()); - } else { - currentKeysToClean.add(cleanupKey); + staleKeysInDiskCount.decrementAndGet(); } } } From 4c8b2408264162d28f98eaff08ff473156f1fc6f Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Tue, 5 Dec 2023 07:12:20 -0800 Subject: [PATCH 12/21] register INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING to cluster settings --- .../java/org/opensearch/common/settings/ClusterSettings.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 0b0dfcc63c422..d7856f6cad98e 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -690,6 +690,7 @@ public void apply(Settings value, Settings current, Settings previous) { IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING, IndicesService.CLUSTER_RESTRICT_INDEX_REPLICATION_TYPE_SETTING, IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING, + IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING, AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE, CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, CPUBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT, From eeb973edfbc7f27817955f956bfaa32aa82e904c Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Tue, 5 Dec 2023 07:12:47 -0800 Subject: [PATCH 13/21] Add removal listener to update eh cache stats --- .../common/cache/tier/EhCacheDiskCachingTier.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java b/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java index f8aedbe5314de..170331d102959 100644 --- a/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java +++ b/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java @@ -326,6 +326,16 @@ public void onEvent(CacheEvent event) { assert event.getNewValue() == null; break; case REMOVED: + this.removalListener.ifPresent( + listener -> listener.onRemoval( + new RemovalNotification<>( + event.getKey(), + valueSerializer.deserialize(event.getOldValue()), + RemovalReason.INVALIDATED, + TierType.DISK + ) + ) + ); count.dec(); assert event.getNewValue() == null; break; From f22cdafa204893585ba21d80aa9e179eafba298d Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Tue, 5 Dec 2023 07:13:00 -0800 Subject: [PATCH 14/21] re-organize imports --- .../cache/tier/EhCacheDiskCachingTier.java | 35 ++++++++----------- 1 file changed, 14 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java b/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java index 170331d102959..39fe6d5377490 100644 --- a/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java +++ b/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java @@ -8,7 +8,21 @@ package org.opensearch.common.cache.tier; +import org.ehcache.Cache; +import org.ehcache.CachePersistenceException; +import org.ehcache.PersistentCacheManager; +import org.ehcache.config.builders.CacheConfigurationBuilder; +import org.ehcache.config.builders.CacheEventListenerConfigurationBuilder; +import org.ehcache.config.builders.CacheManagerBuilder; +import org.ehcache.config.builders.PooledExecutionServiceConfigurationBuilder; +import org.ehcache.config.builders.ResourcePoolsBuilder; +import org.ehcache.config.units.MemoryUnit; import org.ehcache.core.spi.service.FileBasedPersistenceContext; +import org.ehcache.event.CacheEvent; +import org.ehcache.event.CacheEventListener; +import org.ehcache.event.EventType; +import org.ehcache.expiry.ExpiryPolicy; +import org.ehcache.impl.config.store.disk.OffHeapDiskStoreConfiguration; import org.ehcache.spi.serialization.SerializerException; import org.opensearch.OpenSearchException; import org.opensearch.common.cache.RemovalListener; @@ -21,35 +35,14 @@ import org.opensearch.common.unit.TimeValue; import java.io.File; -import java.io.IOException; import java.nio.ByteBuffer; import java.time.Duration; -import java.util.ArrayList; import java.util.Iterator; -import java.util.List; import java.util.NoSuchElementException; import java.util.Objects; import java.util.Optional; import java.util.function.Supplier; -import org.ehcache.Cache; -import org.ehcache.CachePersistenceException; -import org.ehcache.PersistentCacheManager; -import org.ehcache.config.builders.CacheConfigurationBuilder; -import org.ehcache.config.builders.CacheEventListenerConfigurationBuilder; -import org.ehcache.config.builders.CacheManagerBuilder; -import org.ehcache.config.builders.PooledExecutionServiceConfigurationBuilder; -import org.ehcache.config.builders.ResourcePoolsBuilder; -import org.ehcache.config.units.MemoryUnit; -import org.ehcache.event.CacheEvent; -import org.ehcache.event.CacheEventListener; -import org.ehcache.event.EventType; -import org.ehcache.expiry.ExpiryPolicy; -import org.ehcache.impl.config.store.disk.OffHeapDiskStoreConfiguration; -import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.unit.ByteSizeValue; -import org.opensearch.search.query.QuerySearchResult; - /** * An ehcache-based disk tier implementation. * @param The key type of cache entries From 280cb071ddade98ace91de6fe9c14c6856c6853d Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Tue, 5 Dec 2023 07:13:09 -0800 Subject: [PATCH 15/21] Add IT tests --- .../IndicesRequestCacheDiskTierIT.java | 254 +++++++++++++++++- 1 file changed, 253 insertions(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java index 4cb6eb1e3c09f..77808c2a88a68 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java @@ -45,12 +45,14 @@ import org.opensearch.index.query.QueryBuilders; import org.opensearch.test.OpenSearchIntegTestCase; +import java.time.Duration; +import java.time.Instant; + import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; // This is a separate file from IndicesRequestCacheIT because we only want to run our test // on a node with a maximum request cache size that we set. - @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class IndicesRequestCacheDiskTierIT extends OpenSearchIntegTestCase { public void testDiskTierStats() throws Exception { @@ -110,7 +112,257 @@ public void testDiskTierStats() throws Exception { IndicesRequestCacheIT.assertCacheState(client, "index", 0, numRequests + 3, TierType.ON_HEAP, false); IndicesRequestCacheIT.assertCacheState(client, "index", 2, numRequests + 1, TierType.DISK, false); assertDiskTierSpecificStats(client, "index", 2, tookTimeSoFar, tookTimeSoFar); + } + + public void testDiskTierInvalidationByCleanCacheAPI() throws Exception { + int cleanupIntervalInMillis = 10_000_000; // setting this intentionally high so that we don't get background cleanups + int heapSizeBytes = 9876; + String node = internalCluster().startNode( + Settings.builder() + .put(IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(cleanupIntervalInMillis)) + .put(IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING.getKey(), "0%") + .put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), new ByteSizeValue(heapSizeBytes)) + .put(DiskTierTookTimePolicy.DISK_TOOKTIME_THRESHOLD_SETTING.getKey(), TimeValue.ZERO) // allow into disk cache regardless of took time + ); + Client client = client(node); + + Settings.Builder indicesSettingBuilder = Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0); + + assertAcked( + client.admin().indices().prepareCreate("index").setMapping("k", "type=keyword").setSettings(indicesSettingBuilder).get() + ); + + indexRandom(true, client.prepareIndex("index").setSource("k", "hello")); + ensureSearchable("index"); + SearchResponse resp; + + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + 0)).get(); + int requestSize = (int) getCacheSizeBytes(client, "index", TierType.ON_HEAP); + assertTrue(heapSizeBytes > requestSize); + // If this fails, increase heapSizeBytes! We can't adjust it after getting the size of one query + // as the cache size setting is not dynamic + int numOnDisk = 2; + int numRequests = heapSizeBytes / requestSize + numOnDisk; + RequestCacheStats requestCacheStats = client.admin() + .indices() + .prepareStats("index") + .setRequestCache(true) + .get() + .getTotal() + .getRequestCache(); + for (int i = 1; i < numRequests; i++) { + requestCacheStats = client.admin() + .indices() + .prepareStats("index") + .setRequestCache(true) + .get() + .getTotal() + .getRequestCache(); + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + i)).get(); + assertSearchResponse(resp); + IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.ON_HEAP, false); + IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.DISK, false); + } + + requestCacheStats = client.admin() + .indices() + .prepareStats("index") + .setRequestCache(true) + .get() + .getTotal() + .getRequestCache(); + + long entries = requestCacheStats.getEntries(TierType.DISK); + // make sure we have 2 entries in disk. + assertEquals(2, entries); + + // call clear cache api + client.admin().indices().prepareClearCache().setIndices("index").setRequestCache(true).get(); + // fetch the stats again + requestCacheStats = client.admin() + .indices() + .prepareStats("index") + .setRequestCache(true) + .get() + .getTotal() + .getRequestCache(); + entries = requestCacheStats.getEntries(TierType.DISK); + // make sure we have 0 entries in disk. + assertEquals(0, entries); + } + + // When entire disk tier is stale, test whether cache cleaner cleans up everything from disk + public void testDiskTierInvalidationByCacheCleanerEntireDiskTier() throws Exception { + int thresholdInMillis = 4_000; + Instant start = Instant.now(); + int heapSizeBytes = 9876; + String node = internalCluster().startNode( + Settings.builder() + .put(IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(thresholdInMillis)) + .put(IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING.getKey(), "1%") + .put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), new ByteSizeValue(heapSizeBytes)) + .put(DiskTierTookTimePolicy.DISK_TOOKTIME_THRESHOLD_SETTING.getKey(), TimeValue.ZERO) // allow into disk cache regardless of took time + ); + Client client = client(node); + + Settings.Builder indicesSettingBuilder = Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0); + + assertAcked( + client.admin().indices().prepareCreate("index").setMapping("k", "type=keyword").setSettings(indicesSettingBuilder).get() + ); + + indexRandom(true, client.prepareIndex("index").setSource("k", "hello")); + ensureSearchable("index"); + SearchResponse resp; + + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + 0)).get(); + int requestSize = (int) getCacheSizeBytes(client, "index", TierType.ON_HEAP); + assertTrue(heapSizeBytes > requestSize); + + int numOnDisk = 2; + int numRequests = heapSizeBytes / requestSize + numOnDisk; + for (int i = 1; i < numRequests; i++) { + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + i)).get(); + assertSearchResponse(resp); + IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.ON_HEAP, false); + IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.DISK, false); + } + + RequestCacheStats requestCacheStats = client.admin() + .indices() + .prepareStats("index") + .setRequestCache(true) + .get() + .getTotal() + .getRequestCache(); + + // make sure we have 2 entries in disk. + long entries = requestCacheStats.getEntries(TierType.DISK); + assertEquals(2, entries); + + // index a doc and force refresh so that the cache cleaner can clean the cache + indexRandom(true, client.prepareIndex("index").setSource("k", "hello")); + ensureSearchable("index"); + + // sleep for the threshold time, so that the cache cleaner can clean the cache + Instant end = Instant.now(); + long elapsedTimeMillis = Duration.between(start, end).toMillis(); + // if this test is flaky, increase the sleep time. + long sleepTime = (thresholdInMillis - elapsedTimeMillis) + 1_000; + Thread.sleep(sleepTime); + + // by now cache cleaner would have run and cleaned up stale keys + // fetch the stats again + requestCacheStats = client.admin() + .indices() + .prepareStats("index") + .setRequestCache(true) + .get() + .getTotal() + .getRequestCache(); + + // make sure we have 0 entries in disk. + entries = requestCacheStats.getEntries(TierType.DISK); + assertEquals(0, entries); + } + + // When part of disk tier is stale, test whether cache cleaner cleans up only stale items from disk + public void testDiskTierInvalidationByCacheCleanerPartOfDiskTier() throws Exception { + int thresholdInMillis = 4_000; + Instant start = Instant.now(); + int heapSizeBytes = 987; + String node = internalCluster().startNode( + Settings.builder() + .put(IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(thresholdInMillis)) + .put(IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING.getKey(), "1%") + .put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), new ByteSizeValue(heapSizeBytes)) + .put(DiskTierTookTimePolicy.DISK_TOOKTIME_THRESHOLD_SETTING.getKey(), TimeValue.ZERO) // allow into disk cache regardless of took time + ); + Client client = client(node); + + Settings.Builder indicesSettingBuilder = Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0); + + assertAcked( + client.admin().indices().prepareCreate("index").setMapping("k", "type=text").setSettings(indicesSettingBuilder).get() + ); + + indexRandom(true, client.prepareIndex("index").setSource("k", "hello")); + ensureSearchable("index"); + SearchResponse resp; + + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + 0)).get(); + int requestSize = (int) getCacheSizeBytes(client, "index", TierType.ON_HEAP); + assertTrue(heapSizeBytes > requestSize); + + int numOnDisk = 2; + int numRequests = heapSizeBytes / requestSize + numOnDisk; + for (int i = 1; i < numRequests; i++) { + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + i)).get(); + assertSearchResponse(resp); + IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.ON_HEAP, false); + IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.DISK, false); + } + + RequestCacheStats requestCacheStats = client.admin() + .indices() + .prepareStats("index") + .setRequestCache(true) + .get() + .getTotal() + .getRequestCache(); + + // make sure we have 2 entries in disk. + long entries = requestCacheStats.getEntries(TierType.DISK); + assertEquals(2, entries); + + // force refresh so that it creates stale keys in the cache for the cache cleaner to pick up. + flushAndRefresh("index"); + client().prepareIndex("index").setId("1").setSource("k", "good bye"); + ensureSearchable("index"); + + for (int i = 0; i < 6; i++) { // index 5 items with the new readerCacheKeyId + client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + i)).get(); + } + + // fetch the stats again + requestCacheStats = client.admin() + .indices() + .prepareStats("index") + .setRequestCache(true) + .get() + .getTotal() + .getRequestCache(); + + entries = requestCacheStats.getEntries(TierType.DISK); + + // sleep for the threshold time, so that the cache cleaner can clean the cache + Instant end = Instant.now(); + long elapsedTimeMillis = Duration.between(start, end).toMillis(); + // if this test is flaky, increase the sleep time. + long sleepTime = (thresholdInMillis - elapsedTimeMillis) + 5_000; + Thread.sleep(sleepTime); + + // fetch the stats again + requestCacheStats = client.admin() + .indices() + .prepareStats("index") + .setRequestCache(true) + .get() + .getTotal() + .getRequestCache(); + // make sure we have 5 entries in disk. + entries = requestCacheStats.getEntries(TierType.DISK); + assertEquals(5, entries); } private long getCacheSizeBytes(Client client, String index, TierType tierType) { From 8f0b5bcb3f80427166da1f34f04494c3172831ec Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Thu, 14 Dec 2023 17:17:28 -0800 Subject: [PATCH 16/21] refactor cleanCache & cleanDiskCache to share methods --- .../indices/IndicesRequestCache.java | 114 +++++++++--------- 1 file changed, 55 insertions(+), 59 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 600b12a78f3f7..44af6361e95ee 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -526,12 +526,29 @@ public int hashCode() { * Logic to clean up in-memory cache. */ synchronized void cleanCache() { + cleanCache(TierType.ON_HEAP); + tieredCacheService.getOnHeapCachingTier().refresh(); + } + + /** + * Logic to clean up disk based cache. + *

+ * TODO: cleanDiskCache is very specific to disk caching tier. We can refactor this to be more generic. + */ + synchronized void cleanDiskCache(double diskCachesCleanThresholdPercent) { + if (!canSkipDiskCacheCleanup(diskCachesCleanThresholdPercent)) { + cleanCache(TierType.DISK); + } + } + + private synchronized void cleanCache(TierType cacheType) { final Set currentKeysToClean = new HashSet<>(); final Set currentFullClean = new HashSet<>(); - /* - Stores the keys that need to be removed from keysToClean - This is done to avoid ConcurrentModificationException - */ + + /* + Stores the keys that need to be removed from keysToClean + This is done to avoid ConcurrentModificationException + */ final Set keysCleanedFromAllCaches = new HashSet<>(); for (Map.Entry entry : keysToClean.entrySet()) { @@ -543,14 +560,20 @@ synchronized void cleanCache() { continue; } - if (cleanupStatus.cleanedInHeap) continue; + if (cacheType == TierType.ON_HEAP && cleanupStatus.cleanedInHeap) continue; + if (cacheType == TierType.DISK && cleanupStatus.cleanedOnDisk) continue; if (needsFullClean(cleanupKey)) { currentFullClean.add(cleanupKey.entity.getCacheIdentity()); } else { currentKeysToClean.add(cleanupKey); } - cleanupStatus.cleanedInHeap = true; + + if (cacheType == TierType.ON_HEAP) { + cleanupStatus.cleanedInHeap = true; + } else if (cacheType == TierType.DISK) { + cleanupStatus.cleanedOnDisk = true; + } } // Remove keys that have been cleaned from all caches @@ -561,68 +584,41 @@ synchronized void cleanCache() { return; } + CachingTier cachingTier; + + if (cacheType == TierType.ON_HEAP) { + cachingTier = tieredCacheService.getOnHeapCachingTier(); + } else { + cachingTier = tieredCacheService.getDiskCachingTier().get(); + } + cleanUpKeys( - tieredCacheService.getOnHeapCachingTier(), + cachingTier, currentKeysToClean, currentFullClean ); - tieredCacheService.getOnHeapCachingTier().refresh(); } - /** - * Logic to clean up disk based cache. - *

- * TODO: cleanDiskCache is very specific to disk caching tier. We can refactor this to be more generic. - */ - synchronized void cleanDiskCache(double diskCachesCleanThresholdPercent) { - tieredCacheService.getDiskCachingTier().ifPresent(diskCachingTier -> { - if (diskCachingTier.count() == 0 || diskCleanupKeysPercentage() < diskCachesCleanThresholdPercent) { - if (logger.isDebugEnabled()) { - logger.debug("Skipping disk cache keys cleanup"); - return; - } + private synchronized boolean canSkipDiskCacheCleanup(double diskCachesCleanThresholdPercent) { + if (tieredCacheService.getDiskCachingTier().isEmpty()) { + if (logger.isDebugEnabled()) { + logger.debug("Skipping disk cache keys cleanup since disk caching tier is not present"); } - // Categorize keys to be cleaned into currentKeysToClean and currentFullClean - final Set currentKeysToClean = new HashSet<>(); - final Set currentFullClean = new HashSet<>(); - - /* - Stores the keys that need to be removed from keysToClean - This is done to avoid ConcurrentModificationException - */ - final Set keysCleanedFromAllCaches = new HashSet<>(); - - for (Map.Entry entry : keysToClean.entrySet()) { - CleanupKey cleanupKey = entry.getKey(); - CleanupStatus cleanupStatus = entry.getValue(); - - if (cleanupStatus.cleanedInHeap && cleanupStatus.cleanedOnDisk) { - keysCleanedFromAllCaches.add(cleanupKey); - continue; - } - - if (cleanupStatus.cleanedOnDisk) continue; - - if (needsFullClean(cleanupKey)) { - currentFullClean.add(cleanupKey.entity.getCacheIdentity()); - } else { - currentKeysToClean.add(cleanupKey); - } - cleanupStatus.cleanedOnDisk = true; + return true; + } + if (tieredCacheService.getDiskCachingTier().get().count() == 0) { + if (logger.isDebugEnabled()) { + logger.debug("Skipping disk cache keys cleanup since disk caching tier is empty"); } - - // Remove keys that have been cleaned from all caches - keysToClean.keySet().removeAll(keysCleanedFromAllCaches); - - // Early exit if no cleanup is needed - if (currentKeysToClean.isEmpty() && currentFullClean.isEmpty()) { - return; + return true; + } + if (diskCleanupKeysPercentage() < diskCachesCleanThresholdPercent) { + if (logger.isDebugEnabled()) { + logger.debug("Skipping disk cache keys cleanup since the percentage of stale keys in disk cache is less than the threshold"); } - cleanUpKeys( - tieredCacheService.getDiskCachingTier().get(), - currentKeysToClean, - currentFullClean); - }); + return true; + } + return false; } synchronized double diskCleanupKeysPercentage() { From 8c996b3a52a26e18072a60114570cde05de56a13 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Thu, 14 Dec 2023 17:17:42 -0800 Subject: [PATCH 17/21] null checks for indexShard --- .../java/org/opensearch/indices/IndicesRequestCache.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 44af6361e95ee..9a1a1c58c2b97 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -257,6 +257,11 @@ private void updateDiskCleanupKeyToCountMap(CleanupKey cleanupKey, TierType tier return; } IndexShard indexShard = (IndexShard)cleanupKey.entity.getCacheIdentity(); + if(indexShard == null) { + logger.warn("IndexShard is null for CleanupKey: {} while cleaning tier : {}", + cleanupKey.readerCacheKeyId, tierType.getStringValue()); + return; + } ShardId shardId = indexShard.shardId(); diskCleanupKeyToCountMap @@ -275,6 +280,10 @@ private void updateDiskCleanupKeyToCountMap(CleanupKey cleanupKey, TierType tier */ private void updateStaleKeysInDiskCount(CleanupKey cleanupKey) { IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity(); + if(indexShard == null) { + logger.warn("IndexShard is null for CleanupKey: {}", cleanupKey.readerCacheKeyId); + return; + } ShardId shardId = indexShard.shardId(); ConcurrentMap countMap = diskCleanupKeyToCountMap.get(shardId); From 4bd1503ce7c18f9066e84d1ee4ec599ffbb5a5d6 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Thu, 14 Dec 2023 17:18:03 -0800 Subject: [PATCH 18/21] some bugs i found --- .../java/org/opensearch/indices/IndicesRequestCache.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 9a1a1c58c2b97..227751a668c3d 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -637,7 +637,7 @@ synchronized double diskCleanupKeysPercentage() { if (totalKeysInDiskCache == 0 || staleKeysInDiskCount.get() == 0) { return 0; } - return ((double) staleKeysInDiskCount.get() / totalKeysInDiskCache) * 100; + return ((double) staleKeysInDiskCount.get() / totalKeysInDiskCache); } synchronized void cleanUpKeys( @@ -649,7 +649,9 @@ synchronized void cleanUpKeys( CleanupKey cleanupKey = new CleanupKey(key.entity, key.readerCacheKeyId); if (currentFullClean.contains(key.entity.getCacheIdentity()) || currentKeysToClean.contains(cleanupKey)) { cachingTier.invalidate(key); - staleKeysInDiskCount.decrementAndGet(); + if(cachingTier.getTierType() == TierType.DISK) { + staleKeysInDiskCount.decrementAndGet(); + } } } } From 1911fa65f630b54ec4ddfbf89271fdb97ac76481 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Thu, 14 Dec 2023 17:18:55 -0800 Subject: [PATCH 19/21] use assertNumCacheEntries --- .../IndicesRequestCacheDiskTierIT.java | 97 ++----------------- 1 file changed, 6 insertions(+), 91 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java index bbd4ab6e9975d..f85b06ceeac44 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java @@ -147,52 +147,18 @@ public void testDiskTierInvalidationByCleanCacheAPI() throws Exception { // as the cache size setting is not dynamic int numOnDisk = 2; int numRequests = heapSizeBytes / requestSize + numOnDisk; - RequestCacheStats requestCacheStats = client.admin() - .indices() - .prepareStats("index") - .setRequestCache(true) - .get() - .getTotal() - .getRequestCache(); for (int i = 1; i < numRequests; i++) { - requestCacheStats = client.admin() - .indices() - .prepareStats("index") - .setRequestCache(true) - .get() - .getTotal() - .getRequestCache(); resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + i)).get(); assertSearchResponse(resp); IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.ON_HEAP, false); IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.DISK, false); } - - requestCacheStats = client.admin() - .indices() - .prepareStats("index") - .setRequestCache(true) - .get() - .getTotal() - .getRequestCache(); - - long entries = requestCacheStats.getEntries(TierType.DISK); - // make sure we have 2 entries in disk. - assertEquals(2, entries); + IndicesRequestCacheIT.assertNumCacheEntries(client, "index", 2, TierType.DISK); // call clear cache api client.admin().indices().prepareClearCache().setIndices("index").setRequestCache(true).get(); // fetch the stats again - requestCacheStats = client.admin() - .indices() - .prepareStats("index") - .setRequestCache(true) - .get() - .getTotal() - .getRequestCache(); - entries = requestCacheStats.getEntries(TierType.DISK); - // make sure we have 0 entries in disk. - assertEquals(0, entries); + IndicesRequestCacheIT.assertNumCacheEntries(client, "index", 0, TierType.DISK); } // When entire disk tier is stale, test whether cache cleaner cleans up everything from disk @@ -235,17 +201,7 @@ public void testDiskTierInvalidationByCacheCleanerEntireDiskTier() throws Except IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.DISK, false); } - RequestCacheStats requestCacheStats = client.admin() - .indices() - .prepareStats("index") - .setRequestCache(true) - .get() - .getTotal() - .getRequestCache(); - - // make sure we have 2 entries in disk. - long entries = requestCacheStats.getEntries(TierType.DISK); - assertEquals(2, entries); + IndicesRequestCacheIT.assertNumCacheEntries(client, "index", 2, TierType.DISK); // index a doc and force refresh so that the cache cleaner can clean the cache indexRandom(true, client.prepareIndex("index").setSource("k", "hello")); @@ -260,17 +216,7 @@ public void testDiskTierInvalidationByCacheCleanerEntireDiskTier() throws Except // by now cache cleaner would have run and cleaned up stale keys // fetch the stats again - requestCacheStats = client.admin() - .indices() - .prepareStats("index") - .setRequestCache(true) - .get() - .getTotal() - .getRequestCache(); - - // make sure we have 0 entries in disk. - entries = requestCacheStats.getEntries(TierType.DISK); - assertEquals(0, entries); + IndicesRequestCacheIT.assertNumCacheEntries(client, "index", 0, TierType.DISK); } // When part of disk tier is stale, test whether cache cleaner cleans up only stale items from disk @@ -313,17 +259,7 @@ public void testDiskTierInvalidationByCacheCleanerPartOfDiskTier() throws Except IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.DISK, false); } - RequestCacheStats requestCacheStats = client.admin() - .indices() - .prepareStats("index") - .setRequestCache(true) - .get() - .getTotal() - .getRequestCache(); - - // make sure we have 2 entries in disk. - long entries = requestCacheStats.getEntries(TierType.DISK); - assertEquals(2, entries); + IndicesRequestCacheIT.assertNumCacheEntries(client, "index", 2, TierType.DISK); // force refresh so that it creates stale keys in the cache for the cache cleaner to pick up. flushAndRefresh("index"); @@ -334,17 +270,6 @@ public void testDiskTierInvalidationByCacheCleanerPartOfDiskTier() throws Except client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + i)).get(); } - // fetch the stats again - requestCacheStats = client.admin() - .indices() - .prepareStats("index") - .setRequestCache(true) - .get() - .getTotal() - .getRequestCache(); - - entries = requestCacheStats.getEntries(TierType.DISK); - // sleep for the threshold time, so that the cache cleaner can clean the cache Instant end = Instant.now(); long elapsedTimeMillis = Duration.between(start, end).toMillis(); @@ -352,18 +277,8 @@ public void testDiskTierInvalidationByCacheCleanerPartOfDiskTier() throws Except long sleepTime = (thresholdInMillis - elapsedTimeMillis) + 5_000; Thread.sleep(sleepTime); - // fetch the stats again - requestCacheStats = client.admin() - .indices() - .prepareStats("index") - .setRequestCache(true) - .get() - .getTotal() - .getRequestCache(); - // make sure we have 5 entries in disk. - entries = requestCacheStats.getEntries(TierType.DISK); - assertEquals(5, entries); + IndicesRequestCacheIT.assertNumCacheEntries(client, "index", 5, TierType.DISK); } private long getCacheSizeBytes(Client client, String index, TierType tierType) { From ae0b47118dafccfeb51a10ad54268d3a6459e4dc Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Thu, 14 Dec 2023 17:19:14 -0800 Subject: [PATCH 20/21] update print statement --- .../org/opensearch/indices/IndicesRequestCacheDiskTierIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java index f85b06ceeac44..475121d0967f7 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java @@ -266,7 +266,7 @@ public void testDiskTierInvalidationByCacheCleanerPartOfDiskTier() throws Except client().prepareIndex("index").setId("1").setSource("k", "good bye"); ensureSearchable("index"); - for (int i = 0; i < 6; i++) { // index 5 items with the new readerCacheKeyId + for (int i = 0; i < 6; i++) { // index 6 items with the new readerCacheKeyId client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + i)).get(); } From fce110d94095f1c107d1cd6f911a62c5642f6764 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Thu, 14 Dec 2023 17:19:32 -0800 Subject: [PATCH 21/21] edit sleep time --- .../org/opensearch/indices/IndicesRequestCacheDiskTierIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java index 475121d0967f7..77e299497c9a5 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java @@ -211,7 +211,7 @@ public void testDiskTierInvalidationByCacheCleanerEntireDiskTier() throws Except Instant end = Instant.now(); long elapsedTimeMillis = Duration.between(start, end).toMillis(); // if this test is flaky, increase the sleep time. - long sleepTime = (thresholdInMillis - elapsedTimeMillis) + 1_000; + long sleepTime = (thresholdInMillis - elapsedTimeMillis) + 1_500; Thread.sleep(sleepTime); // by now cache cleaner would have run and cleaned up stale keys @@ -274,7 +274,7 @@ public void testDiskTierInvalidationByCacheCleanerPartOfDiskTier() throws Except Instant end = Instant.now(); long elapsedTimeMillis = Duration.between(start, end).toMillis(); // if this test is flaky, increase the sleep time. - long sleepTime = (thresholdInMillis - elapsedTimeMillis) + 5_000; + long sleepTime = (thresholdInMillis - elapsedTimeMillis) + 1_500; Thread.sleep(sleepTime); // make sure we have 5 entries in disk.