Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete Stale Keys from Disk AND RBM #13

Merged
merged 22 commits into from
Dec 19, 2023
Merged
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
52b9dfd
Update IndicesRequestCache.java
kiranprakash154 Nov 14, 2023
5bab65d
More updates to disk cache cleanup
kiranprakash154 Nov 17, 2023
4500090
register INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING in cluste…
kiranprakash154 Nov 29, 2023
4199bc2
make INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING dynamic property
kiranprakash154 Nov 29, 2023
6e05b9d
minor changes
kiranprakash154 Nov 29, 2023
aa3c39c
change the way we calculate disk key staleness
kiranprakash154 Nov 29, 2023
83d70f7
fix breaking tests
kiranprakash154 Nov 29, 2023
caae291
Fix test to include IndexShard
kiranprakash154 Nov 30, 2023
9d0eab1
UT for testing invalidate of DiskTier is called
kiranprakash154 Nov 30, 2023
5c472ae
Introduce CleanupStatus to keysToClean
kiranprakash154 Dec 5, 2023
be10a16
use that cleanupStatus and logic to update staleKeyEntries
kiranprakash154 Dec 5, 2023
4c8b240
register INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING to cluster…
kiranprakash154 Dec 5, 2023
eeb973e
Add removal listener to update eh cache stats
kiranprakash154 Dec 5, 2023
f22cdaf
re-organize imports
kiranprakash154 Dec 5, 2023
280cb07
Add IT tests
kiranprakash154 Dec 5, 2023
27f122b
Merge branch 'feature/tiered-caching' into kp/delete-stale-keys
kiranprakash154 Dec 5, 2023
8f0b5bc
refactor cleanCache & cleanDiskCache to share methods
kiranprakash154 Dec 15, 2023
8c996b3
null checks for indexShard
kiranprakash154 Dec 15, 2023
4bd1503
some bugs i found
kiranprakash154 Dec 15, 2023
1911fa6
use assertNumCacheEntries
kiranprakash154 Dec 15, 2023
ae0b471
update print statement
kiranprakash154 Dec 15, 2023
fce110d
edit sleep time
kiranprakash154 Dec 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
More updates to disk cache cleanup
  • Loading branch information
kiranprakash154 committed Nov 28, 2023
commit 5bab65d449900c89b0a756c48a154d7a116129d3
138 changes: 84 additions & 54 deletions server/src/main/java/org/opensearch/indices/IndicesRequestCache.java
Original file line number Diff line number Diff line change
@@ -42,8 +42,9 @@
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.tier.BytesReferenceSerializer;
import org.opensearch.common.cache.tier.CachePolicyInfoWrapper;
import org.opensearch.common.cache.tier.DiskTierTookTimePolicy;
import org.opensearch.common.cache.tier.CacheValue;
import org.opensearch.common.cache.tier.CachingTier;
import org.opensearch.common.cache.tier.DiskTierTookTimePolicy;
import org.opensearch.common.cache.tier.EhCacheDiskCachingTier;
import org.opensearch.common.cache.tier.OnHeapCachingTier;
import org.opensearch.common.cache.tier.OpenSearchOnHeapCache;
@@ -64,7 +65,6 @@
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.search.query.QuerySearchResult;

import java.io.Closeable;
import java.io.IOException;
@@ -174,6 +174,19 @@ public final class IndicesRequestCache implements TieredCacheEventListener<Indic
this.indicesService = indicesService;
}

// added for testing
IndicesRequestCache(
Settings settings,
IndicesService indicesService,
TieredCacheService<Key, BytesReference> tieredCacheService
) {
this.size = INDICES_CACHE_QUERY_SIZE.get(settings);
this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null;
long sizeInBytes = size.getBytes();
this.indicesService = indicesService;
this.tieredCacheService = tieredCacheService;
}

@Override
public void close() {
tieredCacheService.invalidateAll();
@@ -238,9 +251,10 @@ BytesReference getOrCompute(

/**
* Invalidates the given the cache entry for the given key and it's context
*
* @param cacheEntity the cache entity to invalidate for
* @param reader the reader to invalidate the cache entry for
* @param cacheKey the cache key to invalidate
* @param reader the reader to invalidate the cache entry for
* @param cacheKey the cache key to invalidate
*/
void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference cacheKey) {
assert reader.getReaderCacheHelper() != null;
@@ -426,83 +440,87 @@ public int hashCode() {
/**
* Logic to clean up in-memory cache.
*/
synchronized void cleanCache() { // TODO rename this method to plural or cleanTieredCache ?
synchronized void cleanCache() {
final Set<CleanupKey> currentKeysToClean = new HashSet<>();
final Set<Object> currentFullClean = new HashSet<>();

Iterator<CleanupKey> iterator = keysToClean.iterator();
while(iterator.hasNext()) {
CleanupKey cleanupKey = iterator.next();
iterator.remove();
if (needsFullClean(cleanupKey)) {
currentFullClean.add(cleanupKey.entity.getCacheIdentity());
} else {
currentKeysToClean.add(cleanupKey);
}
}
categorizeKeysForCleanup(currentKeysToClean, currentFullClean);

// Early exit if no cleanup is needed
if (currentKeysToClean.isEmpty() && currentFullClean.isEmpty()) {
return;
}
cleanTieredCaches(currentKeysToClean, currentFullClean);

cleanUpKeys(
tieredCacheService.getOnHeapCachingTier().keys().iterator(),
currentKeysToClean,
currentFullClean
);
tieredCacheService.getOnHeapCachingTier().refresh();
}

private void cleanTieredCaches(Set<CleanupKey> currentKeysToClean, Set<Object> currentFullClean) {
cleanOnHeapCache(currentKeysToClean, currentFullClean);
cleanDiskCache(currentKeysToClean, currentFullClean);
/**
* Logic to clean up disk based cache.
* <p>
* TODO: cleanDiskCache is very specific to disk caching tier. We can refactor this to be more generic.
*/
synchronized void cleanDiskCache(double diskCachesCleanThresholdPercent) {
tieredCacheService.getDiskCachingTier().ifPresent(diskCachingTier -> {
if (diskCachingTier.count() == 0 || diskCleanupKeysPercentage() < diskCachesCleanThresholdPercent) {
if (logger.isDebugEnabled()) {
logger.debug("Skipping disk cache keys cleanup");
return;
}
}
Set<CleanupKey> currentKeysToClean = new HashSet<>();
Set<Object> currentFullClean = new HashSet<>();

categorizeKeysForCleanup(currentKeysToClean, currentFullClean);

// Early exit if no cleanup is needed
if (currentKeysToClean.isEmpty() && currentFullClean.isEmpty()) {
return;
}
cleanUpKeys(diskCachingTier.keys().iterator(), currentKeysToClean, currentFullClean);
});
}

// keeping this unsynchronized since we don't expect it to be called only by cleanCache which is synchronized
private void cleanDiskCache(Set<CleanupKey> currentKeysToClean, Set<Object> currentFullClean) {
if (tieredCacheService.getDiskCachingTier().isEmpty()) {
logger.debug("Skipping disk cache keys cleanup since no disk cache is configured");
return;
}
final double cleanupKeysThresholdPercentage = 50.0; // TODO make this an index setting
int totalKeysInDiskCache = tieredCacheService.getDiskCachingTier().get().count();
int totalKeysToCleanup = currentKeysToClean.size() + currentFullClean.size();

double cleanupKeysPercentage = ((double) totalKeysToCleanup / totalKeysInDiskCache) * 100;
if (cleanupKeysPercentage < cleanupKeysThresholdPercentage) {
logger.debug("Skipping disk cache keys cleanup since the keys to cleanup of {}% is not greater than " +
"the threshold percentage of {}%", cleanupKeysPercentage, cleanupKeysThresholdPercentage);
return;
synchronized double diskCleanupKeysPercentage() {
int totalKeysInDiskCache = tieredCacheService.getDiskCachingTier()
.map(CachingTier::count)
.orElse(0);
if (totalKeysInDiskCache == 0 || keysToClean.isEmpty()) {
return 0;
}
return ((double) keysToClean.size() / totalKeysInDiskCache) * 100;
}

Iterator<Key> iterator = tieredCacheService.getDiskCachingTier().get().keys().iterator();
synchronized void cleanUpKeys(Iterator<Key> iterator, Set<CleanupKey> currentKeysToClean, Set<Object> currentFullClean) {
while (iterator.hasNext()) {
Key key = iterator.next();
CleanupKey cleanupKey = new CleanupKey(key.entity, key.readerCacheKeyId);
if (currentFullClean.contains(key.entity.getCacheIdentity())) {
iterator.remove();
currentFullClean.remove(key.entity.getCacheIdentity());
} else {
CleanupKey cleanupKey = new CleanupKey(key.entity, key.readerCacheKeyId);
if (currentKeysToClean.contains(cleanupKey)) {
iterator.remove();
currentKeysToClean.remove(cleanupKey);
}
keysToClean.remove(cleanupKey); // since a key could be either in onHeap or disk cache.
} else if (currentKeysToClean.contains(cleanupKey)) {
iterator.remove();
currentKeysToClean.remove(cleanupKey);
keysToClean.remove(cleanupKey);
}
}
}

// keeping this unsynchronized since we don't expect it to be called only by cleanCache which is synchronized
private void cleanOnHeapCache(Set<CleanupKey> currentKeysToClean, Set<Object> currentFullClean) {
Iterator<Key> iterator = tieredCacheService.getOnHeapCachingTier().keys().iterator();
private void categorizeKeysForCleanup(Set<CleanupKey> currentKeysToClean, Set<Object> currentFullClean) {
Iterator<CleanupKey> iterator = keysToClean.iterator();
while (iterator.hasNext()) {
Key key = iterator.next();
if (currentFullClean.contains(key.entity.getCacheIdentity())) {
iterator.remove();
currentFullClean.remove(key.entity.getCacheIdentity());
CleanupKey cleanupKey = iterator.next();
if (needsFullClean(cleanupKey)) {
currentFullClean.add(cleanupKey.entity.getCacheIdentity());
} else {
CleanupKey cleanupKey = new CleanupKey(key.entity, key.readerCacheKeyId);
if (currentKeysToClean.contains(cleanupKey)) {
iterator.remove();
currentKeysToClean.remove(cleanupKey);
}
currentKeysToClean.add(cleanupKey);
}
}
tieredCacheService.getOnHeapCachingTier().refresh();
}

private boolean needsFullClean(CleanupKey cleanupKey) {
@@ -520,4 +538,16 @@ long count() {
int numRegisteredCloseListeners() { // for testing
return registeredClosedListeners.size();
}

void addCleanupKeyForTesting(CacheEntity entity, String readerCacheKeyId) { // for testing
keysToClean.add(new CleanupKey(entity, readerCacheKeyId));
}

int getKeysToCleanSizeForTesting() { // for testing
return keysToClean.size();
}

Key createKeyForTesting(CacheEntity entity, String readerCacheKeyId) { // for testing
return new Key(entity, null, readerCacheKeyId);
}
}
Loading