Skip to content

Commit

Permalink
Move + rename PluggableQueryCache files
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Alfonsi <petealft@amazon.com>
  • Loading branch information
Peter Alfonsi committed Dec 11, 2024
1 parent fd91441 commit 9e0549f
Show file tree
Hide file tree
Showing 10 changed files with 275 additions and 132 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cache.common.query;

import org.apache.lucene.document.Document;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.store.Directory;
import org.opensearch.cache.common.tier.MockDiskCache;
import org.opensearch.cache.common.tier.TieredSpilloverCache;
import org.opensearch.cache.common.tier.TieredSpilloverCachePlugin;
import org.opensearch.cache.common.tier.TieredSpilloverCacheSettings;
import org.opensearch.cache.common.tier.TieredSpilloverCacheStatsHolder;
import org.opensearch.cache.common.tier.TieredSpilloverCacheTests;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.module.CacheModule;
import org.opensearch.common.cache.settings.CacheSettings;
import org.opensearch.common.cache.stats.ImmutableCacheStats;
import org.opensearch.common.cache.store.OpenSearchOnHeapCache;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.cache.query.QueryCacheStats;
import org.opensearch.indices.query.DummyQuery;
import org.opensearch.indices.query.PluggableQueryCache;
import org.opensearch.node.Node;
import org.opensearch.plugins.CachePlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchSingleNodeTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.junit.After;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import static org.opensearch.indices.query.PluggableQueryCache.SHARD_ID_DIMENSION_NAME;

public class PluggableQueryCacheTSCTests extends OpenSearchSingleNodeTestCase {

private ThreadPool threadPool;

private ThreadPool getThreadPool() {
return new ThreadPool(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "default tracer tests").build());
}

@After
public void cleanup() throws IOException {
terminate(threadPool);
}

public void testBasics_WithTSC_WithSmallHeapSize() throws Exception {
// TODO: Check all the logic works when TSC is innerCache and can only fit a few keys into its heap tier (aka test the serializers
// work.)
threadPool = getThreadPool();
Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
w.addDocument(new Document());
DirectoryReader r = DirectoryReader.open(w);
w.close();
ShardId shard = new ShardId("index", "_na_", 0);
r = OpenSearchDirectoryReader.wrap(r, shard);
IndexSearcher s = new IndexSearcher(r);
s.setQueryCachingPolicy(alwaysCachePolicy());

PluggableQueryCache cache = getQueryCache(getTSCSettings(1000));
s.setQueryCache(cache);

ICache<PluggableQueryCache.CompositeKey, PluggableQueryCache.CacheAndCount> innerCache = cache.getInnerCache();
assertTrue(innerCache instanceof TieredSpilloverCache);

testBasicsDummyQuery(cache, s, shard);

// Explicitly check disk cache had items and hits
TieredSpilloverCache<PluggableQueryCache.CompositeKey, PluggableQueryCache.CacheAndCount> tsc = (TieredSpilloverCache<
PluggableQueryCache.CompositeKey,
PluggableQueryCache.CacheAndCount>) cache.getInnerCache();
ImmutableCacheStats diskTierStats = TieredSpilloverCacheTests.getStatsSnapshotForTier(
tsc,
TieredSpilloverCacheStatsHolder.TIER_DIMENSION_VALUE_DISK,
List.of(SHARD_ID_DIMENSION_NAME),
List.of(shard.toString())
);
assertTrue(diskTierStats.getItems() > 0);
assertTrue(diskTierStats.getHits() > 0);

cache.close();
IOUtils.close(r, dir);
}

// Duplicated from TieredQueryCacheTests.java
private void testBasicsDummyQuery(PluggableQueryCache cache, IndexSearcher s, ShardId shard) throws IOException {
checkStats(cache.getStats(shard), 0, 0, 0, 0, false);

assertEquals(1, s.count(new DummyQuery(0)));
checkStats(cache.getStats(shard), 1, 1, 0, 2, true);

int numEntries = 20;
for (int i = 1; i < numEntries; ++i) {
assertEquals(1, s.count(new DummyQuery(i)));
}
checkStats(cache.getStats(shard), 10, numEntries, 0, 2 * numEntries, true);

s.count(new DummyQuery(1)); // Pick 1 so the hit comes from disk
checkStats(cache.getStats(shard), 10, numEntries, 1, 2 * numEntries, true);
}

private void checkStats(
QueryCacheStats stats,
long expectedSize,
long expectedCount,
long expectedHits,
long expectedMisses,
boolean checkMemoryAboveZero
) {
// assertEquals(expectedSize, stats.getCacheSize());
assertEquals(expectedCount, stats.getCacheCount());
assertEquals(expectedHits, stats.getHitCount());
assertEquals(expectedMisses, stats.getMissCount());
if (checkMemoryAboveZero) {
assertTrue(stats.getMemorySizeInBytes() > 0L && stats.getMemorySizeInBytes() < Long.MAX_VALUE);
}
}

private Settings getTSCSettings(int heapBytes) {
return Settings.builder()
.put(
CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_QUERY_CACHE).getKey(),
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace(
CacheType.INDICES_QUERY_CACHE.getSettingPrefix()
).getKey(),
OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(
CacheType.INDICES_QUERY_CACHE.getSettingPrefix()
).getKey(),
MockDiskCache.MockDiskCacheFactory.NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_SIZE.getConcreteSettingForNamespace(
CacheType.INDICES_QUERY_CACHE.getSettingPrefix()
).getKey(),
heapBytes + "b"
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_SEGMENTS.getConcreteSettingForNamespace(
CacheType.INDICES_QUERY_CACHE.getSettingPrefix()
).getKey(),
1
)
.put(FeatureFlags.PLUGGABLE_CACHE, "true")
.build();
}

private static QueryCachingPolicy alwaysCachePolicy() {
return new QueryCachingPolicy() {
@Override
public void onUse(Query query) {}

@Override
public boolean shouldCache(Query query) {
return true;
}
};
}

private PluggableQueryCache getQueryCache(Settings settings) throws IOException {
try (NodeEnvironment env = newNodeEnvironment(settings)) {
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
clusterService.getClusterSettings()
.registerSetting(TieredSpilloverCacheSettings.DISK_CACHE_ENABLED_SETTING_MAP.get(CacheType.INDICES_QUERY_CACHE));
return new PluggableQueryCache(
new CacheModule(List.of(new TieredSpilloverCachePlugin(settings), new MockDiskCachePlugin()), settings).getCacheService(),
settings,
clusterService,
env
);
}
}

// Duplicated from TieredSpilloverCacheIT.java
public static class MockDiskCachePlugin extends Plugin implements CachePlugin {

public MockDiskCachePlugin() {}

@Override
public Map<String, ICache.Factory> getCacheFactoryMap() {
return Map.of(MockDiskCache.MockDiskCacheFactory.NAME, new MockDiskCache.MockDiskCacheFactory(0, 10000, false, 1));
}

@Override
public String getName() {
return "mock_disk_plugin";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ public void apply(Settings value, Settings current, Settings previous) {
IndicesService.INDICES_ID_FIELD_DATA_ENABLED_SETTING,
IndicesService.WRITE_DANGLING_INDICES_INFO_SETTING,
IndicesService.CLUSTER_REPLICATION_TYPE_SETTING,
IndicesService.USE_PLUGGABLE_QUERY_CACHE,
MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING,
MappingUpdatedAction.INDICES_MAX_IN_FLIGHT_UPDATES_SETTING,
Metadata.SETTING_READ_ONLY_SETTING,
Expand Down
16 changes: 15 additions & 1 deletion server/src/main/java/org/opensearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@
import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.query.PluggableQueryCache;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoverySettings;
Expand Down Expand Up @@ -310,6 +311,12 @@ public class IndicesService extends AbstractLifecycleComponent
Property.Final
);

public static final Setting<Boolean> USE_PLUGGABLE_QUERY_CACHE = Setting.boolSetting(
"indices.queries.cache.use_pluggable",
false,
Property.NodeScope
);

/**
* The node's settings.
*/
Expand Down Expand Up @@ -417,7 +424,14 @@ public IndicesService(
}
return Optional.of(new IndexShardCacheEntity(indexService.getShardOrNull(shardId.id())));
}), cacheService, threadPool, clusterService, nodeEnv);
this.indicesQueryCache = new IndicesQueryCache(settings);

boolean usePluggableQueryCache = USE_PLUGGABLE_QUERY_CACHE.get(settings);
if (usePluggableQueryCache) {
this.indicesQueryCache = new PluggableQueryCache(cacheService, settings, clusterService, nodeEnv);
} else {
this.indicesQueryCache = new IndicesQueryCache(settings);
}

this.mapperRegistry = mapperRegistry;
this.namedWriteableRegistry = namedWriteableRegistry;
indexingMemoryController = new IndexingMemoryController(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* compatible open source license.
*/

package org.opensearch.cache.common.query;
package org.opensearch.indices.query;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.ConstantScoreScorer;
Expand All @@ -30,7 +30,8 @@ public class DummyQuery extends Query {
// And to do this, the serializer must know about DummyQuery, so it can't live in test module

private final int id;
DummyQuery(int id) {

public DummyQuery(int id) {
this.id = id;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* compatible open source license.
*/

package org.opensearch.cache.common.query;
package org.opensearch.indices.query;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -58,7 +58,6 @@
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.cache.query.QueryCacheStats;
import org.opensearch.indices.OpenSearchQueryCache;
import org.opensearch.search.aggregations.bucket.composite.CompositeKey;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -128,7 +127,7 @@ Looks like LRUQC controls the eviction logic, and just tells LeafCache to remove
/**
* A pluggable version of the query cache which uses an ICache internally to store values. Proof of concept only! Incomplete
*/
public class TieredQueryCache implements QueryCache, OpenSearchQueryCache {
public class PluggableQueryCache implements QueryCache, OpenSearchQueryCache {

private final ICache<CompositeKey, CacheAndCount> innerCache; // This should typically be a TieredSpilloverCache but theoretically can
// be anything - for testing purposes
Expand All @@ -146,7 +145,7 @@ public class TieredQueryCache implements QueryCache, OpenSearchQueryCache {
private final ShardCoreKeyMap shardKeyMap = new ShardCoreKeyMap();
private final Map<String, LongAdder> outerCacheMissCounts;

private static final Logger logger = LogManager.getLogger(TieredQueryCache.class);
private static final Logger logger = LogManager.getLogger(PluggableQueryCache.class);

/**
* The shard id dimension name.
Expand All @@ -162,7 +161,12 @@ public class TieredQueryCache implements QueryCache, OpenSearchQueryCache {
* @param clusterService Cluster service.
* @param nodeEnvironment Node env.
*/
public TieredQueryCache(CacheService cacheService, Settings settings, ClusterService clusterService, NodeEnvironment nodeEnvironment) {
public PluggableQueryCache(
CacheService cacheService,
Settings settings,
ClusterService clusterService,
NodeEnvironment nodeEnvironment
) {

// Following IQC, hardcode leavesToCache and skipFactor
this.leavesToCache = context -> true;
Expand Down Expand Up @@ -342,8 +346,8 @@ private String getShardIdName(Object readerCoreKey) {
return shardKeyMap.getShardId(readerCoreKey).toString();
}

// pkg-private for testing
ICache<CompositeKey, CacheAndCount> getInnerCache() {
// for testing
public ICache<CompositeKey, CacheAndCount> getInnerCache() {
return innerCache;
}

Expand Down Expand Up @@ -618,7 +622,7 @@ public BulkScorer bulkScorer(LeafReaderContext context) throws IOException {
}
}

static class CompositeKey implements Accountable {
public static class CompositeKey implements Accountable {
final int leafCacheId;
final Query query;

Expand Down Expand Up @@ -677,7 +681,7 @@ ICacheKey<CompositeKey> getFinalKey(Query query, String shardIdName) {
/**
* Doc ids and count for a query.
*/
protected static class CacheAndCount implements Accountable {
public static class CacheAndCount implements Accountable {
/**
* An empty CacheAndCount.
*/
Expand Down
Loading

0 comments on commit 9e0549f

Please sign in to comment.