From df8b26eac975363e8d3a6209d3534e0efaa9f765 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Fri, 29 Sep 2023 09:42:13 -0700 Subject: [PATCH 01/16] [Tiered Caching] Enabling serialization for IndicesRequestCache key object Signed-off-by: Sagar Upadhyaya --- .../indices/IndicesRequestCacheIT.java | 40 +++++++++++ .../index/OpenSearchDirectoryReader.java | 52 +++++++++++++- .../indices/IndicesRequestCache.java | 72 +++++++++++++------ .../opensearch/indices/IndicesService.java | 21 ++++-- .../indices/IndicesRequestCacheTests.java | 72 ++++++++++++++----- .../indices/IndicesServiceCloseTests.java | 7 ++ 6 files changed, 222 insertions(+), 42 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index 98a22717019cf..a1815d9be2daf 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -634,6 +634,45 @@ public void testProfileDisableCache() throws Exception { } } + public void testCacheWithInvalidation() throws Exception { + Client client = client(); + assertAcked( + client.admin() + .indices() + .prepareCreate("index") + .setMapping("k", "type=keyword") + .setSettings( + Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + .get() + ); + indexRandom(true, client.prepareIndex("index").setSource("k", "hello")); + ensureSearchable("index"); + SearchResponse resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); + assertSearchResponse(resp); + OpenSearchAssertions.assertAllSuccessful(resp); + assertThat(resp.getHits().getTotalHits().value, equalTo(1L)); + + assertCacheState(client, "index", 0, 1); + // Index but don't refresh + indexRandom(false, client.prepareIndex("index").setSource("k", "hello2")); + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); + assertSearchResponse(resp); + // Should expect hit as here as refresh didn't happen + assertCacheState(client, "index", 1, 1); + + // Explicit refresh would invalidate cache + refresh(); + // Hit same query again + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); + assertSearchResponse(resp); + // Should expect miss as key has changed due to change in IndexReader.CacheKey (due to refresh) + assertCacheState(client, "index", 1, 2); + } + private static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses) { RequestCacheStats requestCacheStats = client.admin() .indices() @@ -648,6 +687,7 @@ private static void assertCacheState(Client client, String index, long expectedH Arrays.asList(expectedHits, expectedMisses, 0L), Arrays.asList(requestCacheStats.getHitCount(), requestCacheStats.getMissCount(), requestCacheStats.getEvictions()) ); + } } diff --git a/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java b/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java index 77609822d3d90..e5038436012dd 100644 --- a/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java +++ b/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java @@ -39,6 +39,7 @@ import org.opensearch.core.index.shard.ShardId; import java.io.IOException; +import java.util.UUID; /** * A {@link org.apache.lucene.index.FilterDirectoryReader} that exposes @@ -51,11 +52,14 @@ public final class OpenSearchDirectoryReader extends FilterDirectoryReader { private final ShardId shardId; private final FilterDirectoryReader.SubReaderWrapper wrapper; + private DelegatingCacheHelper delegatingCacheHelper; + private OpenSearchDirectoryReader(DirectoryReader in, FilterDirectoryReader.SubReaderWrapper wrapper, ShardId shardId) throws IOException { super(in, wrapper); this.wrapper = wrapper; this.shardId = shardId; + this.delegatingCacheHelper = new DelegatingCacheHelper(in.getReaderCacheHelper()); } /** @@ -68,7 +72,53 @@ public ShardId shardId() { @Override public CacheHelper getReaderCacheHelper() { // safe to delegate since this reader does not alter the index - return in.getReaderCacheHelper(); + return this.delegatingCacheHelper; + } + + public DelegatingCacheHelper getDelegatingCacheHelper() { + return this.delegatingCacheHelper; + } + + public class DelegatingCacheHelper implements CacheHelper { + CacheHelper cacheHelper; + DelegatingCacheKey serializableCacheKey; + + DelegatingCacheHelper(CacheHelper cacheHelper) { + this.cacheHelper = cacheHelper; + this.serializableCacheKey = new DelegatingCacheKey(cacheHelper.getKey()); + } + + @Override + public CacheKey getKey() { + return this.cacheHelper.getKey(); + } + + public DelegatingCacheKey getDelegatingCacheKey() { + return this.serializableCacheKey; + } + + @Override + public void addClosedListener(ClosedListener listener) { + this.cacheHelper.addClosedListener(listener); + } + } + + public class DelegatingCacheKey { + CacheKey cacheKey; + private final UUID uniqueId; + + DelegatingCacheKey(CacheKey cacheKey) { + this.cacheKey = cacheKey; + this.uniqueId = UUID.randomUUID(); + } + + public CacheKey getCacheKey() { + return this.cacheKey; + } + + public UUID getId() { + return uniqueId; + } } @Override diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 629cea102a8b2..25a067bdc8d0c 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -51,6 +51,9 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.common.unit.ByteSizeValue; import java.io.Closeable; @@ -108,8 +111,9 @@ public final class IndicesRequestCache implements RemovalListener cache; + private final IndicesService indicesService; - IndicesRequestCache(Settings settings) { + IndicesRequestCache(Settings settings, IndicesService indicesService) { this.size = INDICES_CACHE_QUERY_SIZE.get(settings); this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null; long sizeInBytes = size.getBytes(); @@ -121,6 +125,7 @@ public final class IndicesRequestCache implements RemovalListener { + protected static class Loader implements CacheLoader { private final CacheEntity entity; private final CheckedSupplier loader; @@ -207,7 +225,7 @@ public BytesReference load(Key key) throws Exception { /** * Basic interface to make this cache testable. */ - interface CacheEntity extends Accountable { + interface CacheEntity extends Accountable, Writeable { /** * Called after the value was loaded. @@ -240,6 +258,7 @@ interface CacheEntity extends Accountable { * Called when this entity instance is removed */ void onRemoval(RemovalNotification notification); + } /** @@ -247,17 +266,23 @@ interface CacheEntity extends Accountable { * * @opensearch.internal */ - static class Key implements Accountable { - private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class); + class Key implements Accountable, Writeable { + private final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class); public final CacheEntity entity; // use as identity equality - public final IndexReader.CacheKey readerCacheKey; + public final String readerCacheKeyUniqueId; public final BytesReference value; - Key(CacheEntity entity, IndexReader.CacheKey readerCacheKey, BytesReference value) { + Key(CacheEntity entity, BytesReference value, String readerCacheKeyUniqueId) { this.entity = entity; - this.readerCacheKey = Objects.requireNonNull(readerCacheKey); this.value = value; + this.readerCacheKeyUniqueId = Objects.requireNonNull(readerCacheKeyUniqueId); + } + + Key(StreamInput in) throws IOException { + this.entity = in.readOptionalWriteable(in1 -> indicesService.new IndexShardCacheEntity(in1)); + this.readerCacheKeyUniqueId = in.readOptionalString(); + this.value = in.readBytesReference(); } @Override @@ -276,7 +301,7 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Key key = (Key) o; - if (Objects.equals(readerCacheKey, key.readerCacheKey) == false) return false; + if (Objects.equals(readerCacheKeyUniqueId, key.readerCacheKeyUniqueId) == false) return false; if (!entity.getCacheIdentity().equals(key.entity.getCacheIdentity())) return false; if (!value.equals(key.value)) return false; return true; @@ -285,19 +310,26 @@ public boolean equals(Object o) { @Override public int hashCode() { int result = entity.getCacheIdentity().hashCode(); - result = 31 * result + readerCacheKey.hashCode(); + result = 31 * result + readerCacheKeyUniqueId.hashCode(); result = 31 * result + value.hashCode(); return result; } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalWriteable(entity); + out.writeOptionalString(readerCacheKeyUniqueId); + out.writeBytesReference(value); + } } private class CleanupKey implements IndexReader.ClosedListener { final CacheEntity entity; - final IndexReader.CacheKey readerCacheKey; + final String readerCacheKeyUniqueId; - private CleanupKey(CacheEntity entity, IndexReader.CacheKey readerCacheKey) { + private CleanupKey(CacheEntity entity, String readerCacheKeyUniqueId) { this.entity = entity; - this.readerCacheKey = readerCacheKey; + this.readerCacheKeyUniqueId = readerCacheKeyUniqueId; } @Override @@ -315,7 +347,7 @@ public boolean equals(Object o) { return false; } CleanupKey that = (CleanupKey) o; - if (Objects.equals(readerCacheKey, that.readerCacheKey) == false) return false; + if (Objects.equals(readerCacheKeyUniqueId, that.readerCacheKeyUniqueId) == false) return false; if (!entity.getCacheIdentity().equals(that.entity.getCacheIdentity())) return false; return true; } @@ -323,7 +355,7 @@ public boolean equals(Object o) { @Override public int hashCode() { int result = entity.getCacheIdentity().hashCode(); - result = 31 * result + Objects.hashCode(readerCacheKey); + result = 31 * result + Objects.hashCode(readerCacheKeyUniqueId); return result; } } @@ -336,7 +368,7 @@ synchronized void cleanCache() { for (Iterator iterator = keysToClean.iterator(); iterator.hasNext();) { CleanupKey cleanupKey = iterator.next(); iterator.remove(); - if (cleanupKey.readerCacheKey == null || cleanupKey.entity.isOpen() == false) { + if (cleanupKey.readerCacheKeyUniqueId == null || cleanupKey.entity.isOpen() == false) { // null indicates full cleanup, as does a closed shard currentFullClean.add(cleanupKey.entity.getCacheIdentity()); } else { @@ -349,7 +381,7 @@ synchronized void cleanCache() { if (currentFullClean.contains(key.entity.getCacheIdentity())) { iterator.remove(); } else { - if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerCacheKey))) { + if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerCacheKeyUniqueId))) { iterator.remove(); } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index a72142e65c5e8..f5e71327b6e7b 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -391,7 +391,7 @@ public IndicesService( this.shardsClosedTimeout = settings.getAsTime(INDICES_SHARDS_CLOSED_TIMEOUT, new TimeValue(1, TimeUnit.DAYS)); this.analysisRegistry = analysisRegistry; this.indexNameExpressionResolver = indexNameExpressionResolver; - this.indicesRequestCache = new IndicesRequestCache(settings); + this.indicesRequestCache = new IndicesRequestCache(settings, this); this.indicesQueryCache = new IndicesQueryCache(settings); this.mapperRegistry = mapperRegistry; this.namedWriteableRegistry = namedWriteableRegistry; @@ -1746,14 +1746,21 @@ private BytesReference cacheShardLevelResult( * * @opensearch.internal */ - static final class IndexShardCacheEntity extends AbstractIndexShardCacheEntity { - private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(IndexShardCacheEntity.class); + public final class IndexShardCacheEntity extends AbstractIndexShardCacheEntity { + private final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(IndexShardCacheEntity.class); private final IndexShard indexShard; - protected IndexShardCacheEntity(IndexShard indexShard) { + public IndexShardCacheEntity(IndexShard indexShard) { this.indexShard = indexShard; } + public IndexShardCacheEntity(StreamInput in) throws IOException { + Index index = in.readOptionalWriteable(Index::new); + int shardId = in.readVInt(); + IndexService indexService = indices.get(index.getUUID()); + this.indexShard = Optional.ofNullable(indexService).map(indexService1 -> indexService1.getShard(shardId)).orElse(null); + } + @Override protected ShardRequestCache stats() { return indexShard.requestCache(); @@ -1775,6 +1782,12 @@ public long ramBytesUsed() { // across many entities return BASE_RAM_BYTES_USED; } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalWriteable(indexShard.shardId().getIndex()); + out.writeVInt(indexShard.shardId().id()); + } } @FunctionalInterface diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 8494259c8fd8a..664865f21f3a8 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -52,23 +52,28 @@ import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.common.bytes.AbstractBytesReference; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.XContentHelper; +import org.opensearch.index.IndexService; import org.opensearch.index.cache.request.ShardRequestCache; import org.opensearch.index.query.TermQueryBuilder; -import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.test.OpenSearchSingleNodeTestCase; import java.io.IOException; import java.util.Arrays; +import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; -public class IndicesRequestCacheTests extends OpenSearchTestCase { +public class IndicesRequestCacheTests extends OpenSearchSingleNodeTestCase { public void testBasicOperationsCache() throws Exception { ShardRequestCache requestCacheStats = new ShardRequestCache(); - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -122,7 +127,7 @@ public void testBasicOperationsCache() throws Exception { } public void testCacheDifferentReaders() throws Exception { - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); AtomicBoolean indexShard = new AtomicBoolean(true); ShardRequestCache requestCacheStats = new ShardRequestCache(); Directory dir = newDirectory(); @@ -218,7 +223,7 @@ public void testCacheDifferentReaders() throws Exception { public void testEviction() throws Exception { final ByteSizeValue size; { - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); AtomicBoolean indexShard = new AtomicBoolean(true); ShardRequestCache requestCacheStats = new ShardRequestCache(); Directory dir = newDirectory(); @@ -244,7 +249,8 @@ public void testEviction() throws Exception { IOUtils.close(reader, secondReader, writer, dir, cache); } IndicesRequestCache cache = new IndicesRequestCache( - Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), size.getBytes() + 1 + "b").build() + Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), size.getBytes() + 1 + "b").build(), + null ); AtomicBoolean indexShard = new AtomicBoolean(true); ShardRequestCache requestCacheStats = new ShardRequestCache(); @@ -281,7 +287,7 @@ public void testEviction() throws Exception { } public void testClearAllEntityIdentity() throws Exception { - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); AtomicBoolean indexShard = new AtomicBoolean(true); ShardRequestCache requestCacheStats = new ShardRequestCache(); @@ -366,7 +372,7 @@ public BytesReference get() { public void testInvalidate() throws Exception { ShardRequestCache requestCacheStats = new ShardRequestCache(); - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -435,20 +441,23 @@ public void testInvalidate() throws Exception { public void testEqualsKey() throws IOException { AtomicBoolean trueBoolean = new AtomicBoolean(true); AtomicBoolean falseBoolean = new AtomicBoolean(false); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndicesRequestCache indicesRequestCache = indicesService.indicesRequestCache; Directory dir = newDirectory(); IndexWriterConfig config = newIndexWriterConfig(); IndexWriter writer = new IndexWriter(dir, config); - IndexReader reader1 = DirectoryReader.open(writer); - IndexReader.CacheKey rKey1 = reader1.getReaderCacheHelper().getKey(); + ShardId shardId = new ShardId("foo", "bar", 1); + IndexReader reader1 = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); + String rKey1 = ((OpenSearchDirectoryReader) reader1).getDelegatingCacheHelper().getDelegatingCacheKey().getId().toString(); writer.addDocument(new Document()); - IndexReader reader2 = DirectoryReader.open(writer); - IndexReader.CacheKey rKey2 = reader2.getReaderCacheHelper().getKey(); + IndexReader reader2 = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); + String rKey2 = ((OpenSearchDirectoryReader) reader2).getDelegatingCacheHelper().getDelegatingCacheKey().getId().toString(); IOUtils.close(reader1, reader2, writer, dir); - IndicesRequestCache.Key key1 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(1)); - IndicesRequestCache.Key key2 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(1)); - IndicesRequestCache.Key key3 = new IndicesRequestCache.Key(new TestEntity(null, falseBoolean), rKey1, new TestBytesReference(1)); - IndicesRequestCache.Key key4 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey2, new TestBytesReference(1)); - IndicesRequestCache.Key key5 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(2)); + IndicesRequestCache.Key key1 = indicesRequestCache.new Key(new TestEntity(null, trueBoolean), new TestBytesReference(1), rKey1); + IndicesRequestCache.Key key2 = indicesRequestCache.new Key(new TestEntity(null, trueBoolean), new TestBytesReference(1), rKey1); + IndicesRequestCache.Key key3 = indicesRequestCache.new Key(new TestEntity(null, falseBoolean), new TestBytesReference(1), rKey1); + IndicesRequestCache.Key key4 = indicesRequestCache.new Key(new TestEntity(null, trueBoolean), new TestBytesReference(1), rKey2); + IndicesRequestCache.Key key5 = indicesRequestCache.new Key(new TestEntity(null, trueBoolean), new TestBytesReference(2), rKey2); String s = "Some other random object"; assertEquals(key1, key1); assertEquals(key1, key2); @@ -459,6 +468,32 @@ public void testEqualsKey() throws IOException { assertNotEquals(key1, key5); } + public void testSerializationDeserializationOfCacheKey() throws Exception { + TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); + BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + ShardRequestCache shardRequestCache = new ShardRequestCache(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndicesRequestCache indicesRequestCache = indicesService.indicesRequestCache; + IndexService indexService = createIndex("test"); + IndexShard indexShard = indexService.getShard(0); + IndicesService.IndexShardCacheEntity shardCacheEntity = indicesService.new IndexShardCacheEntity(indexShard); + String readerCacheKeyId = UUID.randomUUID().toString(); + IndicesRequestCache.Key key1 = indicesRequestCache.new Key(shardCacheEntity, termBytes, readerCacheKeyId); + BytesReference bytesReference = null; + try (BytesStreamOutput out = new BytesStreamOutput()) { + key1.writeTo(out); + bytesReference = out.bytes(); + } + StreamInput in = bytesReference.streamInput(); + + IndicesRequestCache.Key key2 = indicesRequestCache.new Key(in); + + assertEquals(readerCacheKeyId, key2.readerCacheKeyUniqueId); + assertEquals(shardCacheEntity.getCacheIdentity(), key2.entity.getCacheIdentity()); + assertEquals(termBytes, key2.value); + + } + private class TestBytesReference extends AbstractBytesReference { int dummyValue; @@ -538,5 +573,8 @@ public Object getCacheIdentity() { public long ramBytesUsed() { return 42; } + + @Override + public void writeTo(StreamOutput out) throws IOException {} } } diff --git a/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java b/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java index 415844dccb611..364c7a94cad54 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java @@ -41,6 +41,7 @@ import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexModule; @@ -59,6 +60,7 @@ import org.opensearch.test.hamcrest.OpenSearchAssertions; import org.opensearch.transport.nio.MockNioTransportPlugin; +import java.io.IOException; import java.nio.file.Path; import java.util.Arrays; import java.util.Collections; @@ -315,6 +317,11 @@ public void testCloseWhileOngoingRequestUsesRequestCache() throws Exception { assertEquals(0L, cache.count()); IndicesRequestCache.CacheEntity cacheEntity = new IndicesRequestCache.CacheEntity() { + @Override + public void writeTo(StreamOutput out) throws IOException { + + } + @Override public long ramBytesUsed() { return 42; From 93c1172af25e0c37d78a251c3eb36c5c0bbdd14f Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Fri, 29 Sep 2023 10:28:53 -0700 Subject: [PATCH 02/16] Fixing javadoc issue Signed-off-by: Sagar Upadhyaya --- .../common/lucene/index/OpenSearchDirectoryReader.java | 8 ++++++++ .../java/org/opensearch/indices/IndicesRequestCache.java | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java b/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java index e5038436012dd..b2e21d2076cbb 100644 --- a/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java +++ b/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java @@ -79,6 +79,10 @@ public DelegatingCacheHelper getDelegatingCacheHelper() { return this.delegatingCacheHelper; } + /** + * Wraps existing IndexReader cache helper which internally provides a way to wrap CacheKey. + * @opensearch.internal + */ public class DelegatingCacheHelper implements CacheHelper { CacheHelper cacheHelper; DelegatingCacheKey serializableCacheKey; @@ -103,6 +107,10 @@ public void addClosedListener(ClosedListener listener) { } } + /** + * Wraps internal IndexReader.CacheKey and attaches a uniqueId to it which can be eventually be used instead of + * object itself for serialization purposes. + */ public class DelegatingCacheKey { CacheKey cacheKey; private final UUID uniqueId; diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 25a067bdc8d0c..e1d18b1172865 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -198,7 +198,7 @@ void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference * * @opensearch.internal */ - protected static class Loader implements CacheLoader { + private static class Loader implements CacheLoader { private final CacheEntity entity; private final CheckedSupplier loader; From a03af2dfe4159ef0fad6688a1606d8a45d14c22b Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Thu, 26 Oct 2023 12:27:16 -0700 Subject: [PATCH 03/16] Addressing comments Signed-off-by: Sagar Upadhyaya --- .../index/OpenSearchDirectoryReader.java | 6 +-- .../indices/IndicesRequestCache.java | 45 +++++++++---------- .../indices/IndicesRequestCacheTests.java | 6 +-- 3 files changed, 28 insertions(+), 29 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java b/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java index b2e21d2076cbb..c9da20f279a3c 100644 --- a/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java +++ b/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java @@ -113,18 +113,18 @@ public void addClosedListener(ClosedListener listener) { */ public class DelegatingCacheKey { CacheKey cacheKey; - private final UUID uniqueId; + private final String uniqueId; DelegatingCacheKey(CacheKey cacheKey) { this.cacheKey = cacheKey; - this.uniqueId = UUID.randomUUID(); + this.uniqueId = UUID.randomUUID().toString(); } public CacheKey getCacheKey() { return this.cacheKey; } - public UUID getId() { + public String getId() { return uniqueId; } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index e1d18b1172865..b47245f74f45a 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -154,15 +154,15 @@ BytesReference getOrCompute( OpenSearchDirectoryReader.DelegatingCacheHelper delegatingCacheHelper = (OpenSearchDirectoryReader.DelegatingCacheHelper) reader .getReaderCacheHelper(); - String readerCacheKeyUniqueId = delegatingCacheHelper.getDelegatingCacheKey().getId().toString(); - assert readerCacheKeyUniqueId != null; - final Key key = new Key(cacheEntity, cacheKey, readerCacheKeyUniqueId); + String readerCacheKeyId = delegatingCacheHelper.getDelegatingCacheKey().getId(); + assert readerCacheKeyId != null; + final Key key = new Key(cacheEntity, cacheKey, readerCacheKeyId); Loader cacheLoader = new Loader(cacheEntity, loader); BytesReference value = cache.computeIfAbsent(key, cacheLoader); if (cacheLoader.isLoaded()) { key.entity.onMiss(); // see if its the first time we see this reader, and make sure to register a cleanup key - CleanupKey cleanupKey = new CleanupKey(cacheEntity, readerCacheKeyUniqueId); + CleanupKey cleanupKey = new CleanupKey(cacheEntity, readerCacheKeyId); if (!registeredClosedListeners.containsKey(cleanupKey)) { Boolean previous = registeredClosedListeners.putIfAbsent(cleanupKey, Boolean.TRUE); if (previous == null) { @@ -183,14 +183,13 @@ BytesReference getOrCompute( */ void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference cacheKey) { assert reader.getReaderCacheHelper() != null; - String readerCacheKeyUniqueId = null; + String readerCacheKeyId = null; if (reader instanceof OpenSearchDirectoryReader) { IndexReader.CacheHelper cacheHelper = ((OpenSearchDirectoryReader) reader).getDelegatingCacheHelper(); - readerCacheKeyUniqueId = ((OpenSearchDirectoryReader.DelegatingCacheHelper) cacheHelper).getDelegatingCacheKey() - .getId() - .toString(); + readerCacheKeyId = ((OpenSearchDirectoryReader.DelegatingCacheHelper) cacheHelper).getDelegatingCacheKey() + .getId(); } - cache.invalidate(new Key(cacheEntity, cacheKey, readerCacheKeyUniqueId)); + cache.invalidate(new Key(cacheEntity, cacheKey, readerCacheKeyId)); } /** @@ -270,18 +269,18 @@ class Key implements Accountable, Writeable { private final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class); public final CacheEntity entity; // use as identity equality - public final String readerCacheKeyUniqueId; + public final String readerCacheKeyId; public final BytesReference value; - Key(CacheEntity entity, BytesReference value, String readerCacheKeyUniqueId) { + Key(CacheEntity entity, BytesReference value, String readerCacheKeyId) { this.entity = entity; this.value = value; - this.readerCacheKeyUniqueId = Objects.requireNonNull(readerCacheKeyUniqueId); + this.readerCacheKeyId = Objects.requireNonNull(readerCacheKeyId); } Key(StreamInput in) throws IOException { this.entity = in.readOptionalWriteable(in1 -> indicesService.new IndexShardCacheEntity(in1)); - this.readerCacheKeyUniqueId = in.readOptionalString(); + this.readerCacheKeyId = in.readOptionalString(); this.value = in.readBytesReference(); } @@ -301,7 +300,7 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Key key = (Key) o; - if (Objects.equals(readerCacheKeyUniqueId, key.readerCacheKeyUniqueId) == false) return false; + if (Objects.equals(readerCacheKeyId, key.readerCacheKeyId) == false) return false; if (!entity.getCacheIdentity().equals(key.entity.getCacheIdentity())) return false; if (!value.equals(key.value)) return false; return true; @@ -310,7 +309,7 @@ public boolean equals(Object o) { @Override public int hashCode() { int result = entity.getCacheIdentity().hashCode(); - result = 31 * result + readerCacheKeyUniqueId.hashCode(); + result = 31 * result + readerCacheKeyId.hashCode(); result = 31 * result + value.hashCode(); return result; } @@ -318,18 +317,18 @@ public int hashCode() { @Override public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(entity); - out.writeOptionalString(readerCacheKeyUniqueId); + out.writeOptionalString(readerCacheKeyId); out.writeBytesReference(value); } } private class CleanupKey implements IndexReader.ClosedListener { final CacheEntity entity; - final String readerCacheKeyUniqueId; + final String readerCacheKeyId; - private CleanupKey(CacheEntity entity, String readerCacheKeyUniqueId) { + private CleanupKey(CacheEntity entity, String readerCacheKeyId) { this.entity = entity; - this.readerCacheKeyUniqueId = readerCacheKeyUniqueId; + this.readerCacheKeyId = readerCacheKeyId; } @Override @@ -347,7 +346,7 @@ public boolean equals(Object o) { return false; } CleanupKey that = (CleanupKey) o; - if (Objects.equals(readerCacheKeyUniqueId, that.readerCacheKeyUniqueId) == false) return false; + if (Objects.equals(readerCacheKeyId, that.readerCacheKeyId) == false) return false; if (!entity.getCacheIdentity().equals(that.entity.getCacheIdentity())) return false; return true; } @@ -355,7 +354,7 @@ public boolean equals(Object o) { @Override public int hashCode() { int result = entity.getCacheIdentity().hashCode(); - result = 31 * result + Objects.hashCode(readerCacheKeyUniqueId); + result = 31 * result + Objects.hashCode(readerCacheKeyId); return result; } } @@ -368,7 +367,7 @@ synchronized void cleanCache() { for (Iterator iterator = keysToClean.iterator(); iterator.hasNext();) { CleanupKey cleanupKey = iterator.next(); iterator.remove(); - if (cleanupKey.readerCacheKeyUniqueId == null || cleanupKey.entity.isOpen() == false) { + if (cleanupKey.readerCacheKeyId == null || cleanupKey.entity.isOpen() == false) { // null indicates full cleanup, as does a closed shard currentFullClean.add(cleanupKey.entity.getCacheIdentity()); } else { @@ -381,7 +380,7 @@ synchronized void cleanCache() { if (currentFullClean.contains(key.entity.getCacheIdentity())) { iterator.remove(); } else { - if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerCacheKeyUniqueId))) { + if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerCacheKeyId))) { iterator.remove(); } } diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 664865f21f3a8..18ec013711f22 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -448,10 +448,10 @@ public void testEqualsKey() throws IOException { IndexWriter writer = new IndexWriter(dir, config); ShardId shardId = new ShardId("foo", "bar", 1); IndexReader reader1 = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); - String rKey1 = ((OpenSearchDirectoryReader) reader1).getDelegatingCacheHelper().getDelegatingCacheKey().getId().toString(); + String rKey1 = ((OpenSearchDirectoryReader) reader1).getDelegatingCacheHelper().getDelegatingCacheKey().getId(); writer.addDocument(new Document()); IndexReader reader2 = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); - String rKey2 = ((OpenSearchDirectoryReader) reader2).getDelegatingCacheHelper().getDelegatingCacheKey().getId().toString(); + String rKey2 = ((OpenSearchDirectoryReader) reader2).getDelegatingCacheHelper().getDelegatingCacheKey().getId(); IOUtils.close(reader1, reader2, writer, dir); IndicesRequestCache.Key key1 = indicesRequestCache.new Key(new TestEntity(null, trueBoolean), new TestBytesReference(1), rKey1); IndicesRequestCache.Key key2 = indicesRequestCache.new Key(new TestEntity(null, trueBoolean), new TestBytesReference(1), rKey1); @@ -488,7 +488,7 @@ public void testSerializationDeserializationOfCacheKey() throws Exception { IndicesRequestCache.Key key2 = indicesRequestCache.new Key(in); - assertEquals(readerCacheKeyId, key2.readerCacheKeyUniqueId); + assertEquals(readerCacheKeyId, key2.readerCacheKeyId); assertEquals(shardCacheEntity.getCacheIdentity(), key2.entity.getCacheIdentity()); assertEquals(termBytes, key2.value); From 40cc1e2407e46188b237f2825ceb056641f12cab Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Thu, 26 Oct 2023 12:48:24 -0700 Subject: [PATCH 04/16] Adding changelog Signed-off-by: Sagar Upadhyaya --- CHANGELOG.md | 1 + .../main/java/org/opensearch/indices/IndicesRequestCache.java | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b40878066960a..601934fde849e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -98,6 +98,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Remote cluster state] Make index and global metadata upload timeout dynamic cluster settings ([#10814](https://github.com/opensearch-project/OpenSearch/pull/10814)) - Added cluster setting cluster.restrict.index.replication_type to restrict setting of index setting replication type ([#10866](https://github.com/opensearch-project/OpenSearch/pull/10866)) - Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670)) +- [Tiered caching] Enabling serialization for IndicesRequestCache key object ([#10275](https://github.com/opensearch-project/OpenSearch/pull/10275)) ### Dependencies - Bump `com.google.api.grpc:proto-google-common-protos` from 2.10.0 to 2.25.1 ([#10208](https://github.com/opensearch-project/OpenSearch/pull/10208), [#10298](https://github.com/opensearch-project/OpenSearch/pull/10298)) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index b47245f74f45a..d8a67507d40a3 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -186,8 +186,7 @@ void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference String readerCacheKeyId = null; if (reader instanceof OpenSearchDirectoryReader) { IndexReader.CacheHelper cacheHelper = ((OpenSearchDirectoryReader) reader).getDelegatingCacheHelper(); - readerCacheKeyId = ((OpenSearchDirectoryReader.DelegatingCacheHelper) cacheHelper).getDelegatingCacheKey() - .getId(); + readerCacheKeyId = ((OpenSearchDirectoryReader.DelegatingCacheHelper) cacheHelper).getDelegatingCacheKey().getId(); } cache.invalidate(new Key(cacheEntity, cacheKey, readerCacheKeyId)); } From 7f2c833b9ac5c6ee68355c9d75c99033cc320c80 Mon Sep 17 00:00:00 2001 From: Sagar <99425694+sgup432@users.noreply.github.com> Date: Mon, 6 Nov 2023 14:08:45 -0800 Subject: [PATCH 05/16] Update server/src/main/java/org/opensearch/indices/IndicesRequestCache.java Co-authored-by: Kiran Prakash Signed-off-by: Sagar <99425694+sgup432@users.noreply.github.com> --- .../main/java/org/opensearch/indices/IndicesRequestCache.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index d8a67507d40a3..631bb1cb9398f 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -299,7 +299,7 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Key key = (Key) o; - if (Objects.equals(readerCacheKeyId, key.readerCacheKeyId) == false) return false; + if (!Objects.equals(readerCacheKeyId, key.readerCacheKeyId)) return false; if (!entity.getCacheIdentity().equals(key.entity.getCacheIdentity())) return false; if (!value.equals(key.value)) return false; return true; From 4d2b5edffbc7159463212e7c5d023557f758ae7a Mon Sep 17 00:00:00 2001 From: Sagar <99425694+sgup432@users.noreply.github.com> Date: Mon, 6 Nov 2023 14:12:30 -0800 Subject: [PATCH 06/16] Update server/src/main/java/org/opensearch/indices/IndicesRequestCache.java Co-authored-by: Kiran Prakash Signed-off-by: Sagar <99425694+sgup432@users.noreply.github.com> --- .../main/java/org/opensearch/indices/IndicesRequestCache.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 631bb1cb9398f..6acd22b1e1e10 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -366,7 +366,7 @@ synchronized void cleanCache() { for (Iterator iterator = keysToClean.iterator(); iterator.hasNext();) { CleanupKey cleanupKey = iterator.next(); iterator.remove(); - if (cleanupKey.readerCacheKeyId == null || cleanupKey.entity.isOpen() == false) { + if (cleanupKey.readerCacheKeyId == null || !cleanupKey.entity.isOpen()) { // null indicates full cleanup, as does a closed shard currentFullClean.add(cleanupKey.entity.getCacheIdentity()); } else { From ebeadf95cbc016cce3e8a4f61644a288fde4f6f5 Mon Sep 17 00:00:00 2001 From: Sagar <99425694+sgup432@users.noreply.github.com> Date: Mon, 6 Nov 2023 14:15:02 -0800 Subject: [PATCH 07/16] Update server/src/main/java/org/opensearch/indices/IndicesRequestCache.java Co-authored-by: Kiran Prakash Signed-off-by: Sagar <99425694+sgup432@users.noreply.github.com> --- .../main/java/org/opensearch/indices/IndicesRequestCache.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 6acd22b1e1e10..22a7573e135bf 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -345,7 +345,7 @@ public boolean equals(Object o) { return false; } CleanupKey that = (CleanupKey) o; - if (Objects.equals(readerCacheKeyId, that.readerCacheKeyId) == false) return false; + if (!Objects.equals(readerCacheKeyId, that.readerCacheKeyId)) return false; if (!entity.getCacheIdentity().equals(that.entity.getCacheIdentity())) return false; return true; } From 6acc85d493450feaa6b02cec77873d7ce5ebf86f Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Mon, 6 Nov 2023 14:16:26 -0800 Subject: [PATCH 08/16] Making DelegatingCache helper and key members private Signed-off-by: Sagar Upadhyaya --- .../common/lucene/index/OpenSearchDirectoryReader.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java b/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java index c9da20f279a3c..bed61d73a3901 100644 --- a/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java +++ b/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java @@ -84,8 +84,8 @@ public DelegatingCacheHelper getDelegatingCacheHelper() { * @opensearch.internal */ public class DelegatingCacheHelper implements CacheHelper { - CacheHelper cacheHelper; - DelegatingCacheKey serializableCacheKey; + private CacheHelper cacheHelper; + private DelegatingCacheKey serializableCacheKey; DelegatingCacheHelper(CacheHelper cacheHelper) { this.cacheHelper = cacheHelper; @@ -112,7 +112,7 @@ public void addClosedListener(ClosedListener listener) { * object itself for serialization purposes. */ public class DelegatingCacheKey { - CacheKey cacheKey; + private CacheKey cacheKey; private final String uniqueId; DelegatingCacheKey(CacheKey cacheKey) { From 00091c3d95ed4ddcfb7474e1947a78d40174963e Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Tue, 2 Jan 2024 10:32:58 -0800 Subject: [PATCH 09/16] Fixing issue where RamUsageEstimator is getting initialized for every key. Thanks to @peteralfonsi Signed-off-by: Sagar Upadhyaya --- .../main/java/org/opensearch/indices/IndicesRequestCache.java | 4 ++-- .../src/main/java/org/opensearch/indices/IndicesService.java | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 22a7573e135bf..42d7d973d09b8 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -106,6 +106,8 @@ public final class IndicesRequestCache implements RemovalListener registeredClosedListeners = ConcurrentCollections.newConcurrentMap(); private final Set keysToClean = ConcurrentCollections.newConcurrentSet(); private final ByteSizeValue size; @@ -265,8 +267,6 @@ interface CacheEntity extends Accountable, Writeable { * @opensearch.internal */ class Key implements Accountable, Writeable { - private final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class); - public final CacheEntity entity; // use as identity equality public final String readerCacheKeyId; public final BytesReference value; diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index fd7635307a299..1605d639f2ffd 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -301,6 +301,8 @@ public class IndicesService extends AbstractLifecycleComponent Property.Final ); + private static long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(IndexShardCacheEntity.class); + /** * The node's settings. */ @@ -1757,7 +1759,6 @@ private BytesReference cacheShardLevelResult( * @opensearch.internal */ public final class IndexShardCacheEntity extends AbstractIndexShardCacheEntity { - private final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(IndexShardCacheEntity.class); private final IndexShard indexShard; public IndexShardCacheEntity(IndexShard indexShard) { From dc3f903ea52cbc03e1243a888f9dd8120839c999 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Tue, 2 Jan 2024 10:54:15 -0800 Subject: [PATCH 10/16] Making delegating cache helper/key members final Signed-off-by: Sagar Upadhyaya --- .../common/lucene/index/OpenSearchDirectoryReader.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java b/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java index 777596594d462..ce5251dd03260 100644 --- a/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java +++ b/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java @@ -54,7 +54,7 @@ public final class OpenSearchDirectoryReader extends FilterDirectoryReader { private final ShardId shardId; private final FilterDirectoryReader.SubReaderWrapper wrapper; - private DelegatingCacheHelper delegatingCacheHelper; + private final DelegatingCacheHelper delegatingCacheHelper; private OpenSearchDirectoryReader(DirectoryReader in, FilterDirectoryReader.SubReaderWrapper wrapper, ShardId shardId) throws IOException { @@ -86,8 +86,8 @@ public DelegatingCacheHelper getDelegatingCacheHelper() { * @opensearch.internal */ public class DelegatingCacheHelper implements CacheHelper { - private CacheHelper cacheHelper; - private DelegatingCacheKey serializableCacheKey; + private final CacheHelper cacheHelper; + private final DelegatingCacheKey serializableCacheKey; DelegatingCacheHelper(CacheHelper cacheHelper) { this.cacheHelper = cacheHelper; @@ -114,7 +114,7 @@ public void addClosedListener(ClosedListener listener) { * object itself for serialization purposes. */ public class DelegatingCacheKey { - private CacheKey cacheKey; + private final CacheKey cacheKey; private final String uniqueId; DelegatingCacheKey(CacheKey cacheKey) { From f051604501de3ffdbe638df594e797382d72d504 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Fri, 5 Jan 2024 23:19:36 -0800 Subject: [PATCH 11/16] Using shardId as cache entity in IndicesRequestCache key Signed-off-by: Sagar Upadhyaya --- .../opensearch/core/index/shard/ShardId.java | 7 + .../indices/IndicesRequestCache.java | 47 ++--- .../opensearch/indices/IndicesService.java | 31 +-- .../indices/IndicesRequestCacheTests.java | 181 +++++++++--------- .../indices/IndicesServiceCloseTests.java | 23 +-- 5 files changed, 129 insertions(+), 160 deletions(-) diff --git a/libs/core/src/main/java/org/opensearch/core/index/shard/ShardId.java b/libs/core/src/main/java/org/opensearch/core/index/shard/ShardId.java index c0abad7ed727f..1e48cf1f476da 100644 --- a/libs/core/src/main/java/org/opensearch/core/index/shard/ShardId.java +++ b/libs/core/src/main/java/org/opensearch/core/index/shard/ShardId.java @@ -32,6 +32,7 @@ package org.opensearch.core.index.shard; +import org.apache.lucene.util.RamUsageEstimator; import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.common.Strings; import org.opensearch.core.common.io.stream.StreamInput; @@ -55,6 +56,8 @@ public class ShardId implements Comparable, ToXContentFragment, Writeab private final int shardId; private final int hashCode; + private final static long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(ShardId.class); + /** * Constructs a new shard id. * @param index the index name @@ -88,6 +91,10 @@ public ShardId(StreamInput in) throws IOException { hashCode = computeHashCode(); } + public long getBaseRamBytesUsed() { + return BASE_RAM_BYTES_USED; + } + /** * Writes this shard id to a stream. * @param out the stream to write to diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 42d7d973d09b8..bba6217c504a9 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -55,6 +55,8 @@ import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.shard.IndexShard; import java.io.Closeable; import java.io.IOException; @@ -65,6 +67,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.function.Function; /** * The indices request cache allows to cache a shard level request stage responses, helping with improving @@ -113,9 +116,9 @@ public final class IndicesRequestCache implements RemovalListener cache; - private final IndicesService indicesService; + private final Function cacheEntityFunction; - IndicesRequestCache(Settings settings, IndicesService indicesService) { + IndicesRequestCache(Settings settings, Function cacheEntityFunction) { this.size = INDICES_CACHE_QUERY_SIZE.get(settings); this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null; long sizeInBytes = size.getBytes(); @@ -127,7 +130,7 @@ public final class IndicesRequestCache implements RemovalListener notification) { - notification.getKey().entity.onRemoval(notification); + cacheEntityFunction.apply(notification.getKey().shardId).onRemoval(notification); } BytesReference getOrCompute( - CacheEntity cacheEntity, + IndicesService.IndexShardCacheEntity cacheEntity, CheckedSupplier loader, DirectoryReader reader, BytesReference cacheKey @@ -158,11 +161,11 @@ BytesReference getOrCompute( .getReaderCacheHelper(); String readerCacheKeyId = delegatingCacheHelper.getDelegatingCacheKey().getId(); assert readerCacheKeyId != null; - final Key key = new Key(cacheEntity, cacheKey, readerCacheKeyId); + final Key key = new Key(((IndexShard) cacheEntity.getCacheIdentity()).shardId(), cacheKey, readerCacheKeyId); Loader cacheLoader = new Loader(cacheEntity, loader); BytesReference value = cache.computeIfAbsent(key, cacheLoader); if (cacheLoader.isLoaded()) { - key.entity.onMiss(); + cacheEntity.onMiss(); // see if its the first time we see this reader, and make sure to register a cleanup key CleanupKey cleanupKey = new CleanupKey(cacheEntity, readerCacheKeyId); if (!registeredClosedListeners.containsKey(cleanupKey)) { @@ -172,7 +175,7 @@ BytesReference getOrCompute( } } } else { - key.entity.onHit(); + cacheEntity.onHit(); } return value; } @@ -183,14 +186,14 @@ BytesReference getOrCompute( * @param reader the reader to invalidate the cache entry for * @param cacheKey the cache key to invalidate */ - void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference cacheKey) { + void invalidate(IndicesService.IndexShardCacheEntity cacheEntity, DirectoryReader reader, BytesReference cacheKey) { assert reader.getReaderCacheHelper() != null; String readerCacheKeyId = null; if (reader instanceof OpenSearchDirectoryReader) { IndexReader.CacheHelper cacheHelper = ((OpenSearchDirectoryReader) reader).getDelegatingCacheHelper(); readerCacheKeyId = ((OpenSearchDirectoryReader.DelegatingCacheHelper) cacheHelper).getDelegatingCacheKey().getId(); } - cache.invalidate(new Key(cacheEntity, cacheKey, readerCacheKeyId)); + cache.invalidate(new Key(((IndexShard) cacheEntity.getCacheIdentity()).shardId(), cacheKey, readerCacheKeyId)); } /** @@ -225,7 +228,7 @@ public BytesReference load(Key key) throws Exception { /** * Basic interface to make this cache testable. */ - interface CacheEntity extends Accountable, Writeable { + interface CacheEntity extends Accountable { /** * Called after the value was loaded. @@ -266,26 +269,26 @@ interface CacheEntity extends Accountable, Writeable { * * @opensearch.internal */ - class Key implements Accountable, Writeable { - public final CacheEntity entity; // use as identity equality + static class Key implements Accountable, Writeable { + public final ShardId shardId; // use as identity equality public final String readerCacheKeyId; public final BytesReference value; - Key(CacheEntity entity, BytesReference value, String readerCacheKeyId) { - this.entity = entity; + Key(ShardId shardId, BytesReference value, String readerCacheKeyId) { + this.shardId = shardId; this.value = value; this.readerCacheKeyId = Objects.requireNonNull(readerCacheKeyId); } Key(StreamInput in) throws IOException { - this.entity = in.readOptionalWriteable(in1 -> indicesService.new IndexShardCacheEntity(in1)); + this.shardId = in.readOptionalWriteable(ShardId::new); this.readerCacheKeyId = in.readOptionalString(); this.value = in.readBytesReference(); } @Override public long ramBytesUsed() { - return BASE_RAM_BYTES_USED + entity.ramBytesUsed() + value.length(); + return BASE_RAM_BYTES_USED + shardId.getBaseRamBytesUsed() + value.length(); } @Override @@ -300,14 +303,14 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; Key key = (Key) o; if (!Objects.equals(readerCacheKeyId, key.readerCacheKeyId)) return false; - if (!entity.getCacheIdentity().equals(key.entity.getCacheIdentity())) return false; + if (!shardId.equals(key.shardId)) return false; if (!value.equals(key.value)) return false; return true; } @Override public int hashCode() { - int result = entity.getCacheIdentity().hashCode(); + int result = shardId.hashCode(); result = 31 * result + readerCacheKeyId.hashCode(); result = 31 * result + value.hashCode(); return result; @@ -315,7 +318,7 @@ public int hashCode() { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeOptionalWriteable(entity); + out.writeOptionalWriteable(shardId); out.writeOptionalString(readerCacheKeyId); out.writeBytesReference(value); } @@ -376,10 +379,10 @@ synchronized void cleanCache() { if (!currentKeysToClean.isEmpty() || !currentFullClean.isEmpty()) { for (Iterator iterator = cache.keys().iterator(); iterator.hasNext();) { Key key = iterator.next(); - if (currentFullClean.contains(key.entity.getCacheIdentity())) { + if (currentFullClean.contains(cacheEntityFunction.apply(key.shardId).getCacheIdentity())) { iterator.remove(); } else { - if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerCacheKeyId))) { + if (currentKeysToClean.contains(new CleanupKey(cacheEntityFunction.apply(key.shardId), key.readerCacheKeyId))) { iterator.remove(); } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 215a13abccf78..635fe2a07ed2d 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -178,6 +178,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -193,7 +194,6 @@ import java.util.stream.Collectors; import static java.util.Collections.emptyList; -import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; import static org.opensearch.common.collect.MapBuilder.newMapBuilder; import static org.opensearch.common.util.concurrent.OpenSearchExecutors.daemonThreadFactory; @@ -301,8 +301,6 @@ public class IndicesService extends AbstractLifecycleComponent Property.Final ); - private static long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(IndexShardCacheEntity.class); - /** * If enabled, this setting enforces that indexes will be created with a replication type matching the cluster setting * defined in cluster.indices.replication.strategy by rejecting any request that specifies a replication type that @@ -335,7 +333,7 @@ public class IndicesService extends AbstractLifecycleComponent private final ScriptService scriptService; private final ClusterService clusterService; private final Client client; - private volatile Map indices = emptyMap(); + private volatile Map indices = new ConcurrentHashMap<>(); private final Map> pendingDeletes = new HashMap<>(); private final AtomicInteger numUncompletedDeletes = new AtomicInteger(); private final OldShardsStats oldShardsStats = new OldShardsStats(); @@ -411,7 +409,10 @@ public IndicesService( this.shardsClosedTimeout = settings.getAsTime(INDICES_SHARDS_CLOSED_TIMEOUT, new TimeValue(1, TimeUnit.DAYS)); this.analysisRegistry = analysisRegistry; this.indexNameExpressionResolver = indexNameExpressionResolver; - this.indicesRequestCache = new IndicesRequestCache(settings, this); + this.indicesRequestCache = new IndicesRequestCache(settings, (shardId -> { + IndexService indexService = indexServiceSafe(shardId.getIndex()); + return new IndexShardCacheEntity(indexService.getShard(shardId.id())); + })); this.indicesQueryCache = new IndicesQueryCache(settings); this.mapperRegistry = mapperRegistry; this.namedWriteableRegistry = namedWriteableRegistry; @@ -1746,7 +1747,6 @@ private BytesReference cacheShardLevelResult( BytesReference cacheKey, CheckedConsumer loader ) throws Exception { - IndexShardCacheEntity cacheEntity = new IndexShardCacheEntity(shard); CheckedSupplier supplier = () -> { /* BytesStreamOutput allows to pass the expected size but by default uses * BigArrays.PAGE_SIZE_IN_BYTES which is 16k. A common cached result ie. @@ -1763,7 +1763,7 @@ private BytesReference cacheShardLevelResult( return out.bytes(); } }; - return indicesRequestCache.getOrCompute(cacheEntity, supplier, reader, cacheKey); + return indicesRequestCache.getOrCompute(new IndexShardCacheEntity(shard), supplier, reader, cacheKey); } /** @@ -1771,20 +1771,15 @@ private BytesReference cacheShardLevelResult( * * @opensearch.internal */ - public final class IndexShardCacheEntity extends AbstractIndexShardCacheEntity { + public static class IndexShardCacheEntity extends AbstractIndexShardCacheEntity { + + private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(IndexShardCacheEntity.class); private final IndexShard indexShard; public IndexShardCacheEntity(IndexShard indexShard) { this.indexShard = indexShard; } - public IndexShardCacheEntity(StreamInput in) throws IOException { - Index index = in.readOptionalWriteable(Index::new); - int shardId = in.readVInt(); - IndexService indexService = indices.get(index.getUUID()); - this.indexShard = Optional.ofNullable(indexService).map(indexService1 -> indexService1.getShard(shardId)).orElse(null); - } - @Override protected ShardRequestCache stats() { return indexShard.requestCache(); @@ -1806,12 +1801,6 @@ public long ramBytesUsed() { // across many entities return BASE_RAM_BYTES_USED; } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeOptionalWriteable(indexShard.shardId().getIndex()); - out.writeVInt(indexShard.shardId().id()); - } } @FunctionalInterface diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 18ec013711f22..3ae9e55c8ad7a 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -53,27 +53,33 @@ import org.opensearch.core.common.bytes.AbstractBytesReference; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.XContentHelper; import org.opensearch.index.IndexService; +import org.opensearch.index.cache.request.RequestCacheStats; import org.opensearch.index.cache.request.ShardRequestCache; import org.opensearch.index.query.TermQueryBuilder; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardState; import org.opensearch.test.OpenSearchSingleNodeTestCase; import java.io.IOException; import java.util.Arrays; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class IndicesRequestCacheTests extends OpenSearchSingleNodeTestCase { public void testBasicOperationsCache() throws Exception { - ShardRequestCache requestCacheStats = new ShardRequestCache(); - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); + IndexShard indexShard = createIndex("test").getShard(0); + IndicesRequestCache cache = new IndicesRequestCache( + Settings.EMPTY, + (shardId -> new IndicesService.IndexShardCacheEntity(indexShard)) + ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -81,13 +87,13 @@ public void testBasicOperationsCache() throws Exception { DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - AtomicBoolean indexShard = new AtomicBoolean(true); // initial cache - TestEntity entity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value.streamInput().readString()); + ShardRequestCache requestCacheStats = indexShard.requestCache(); assertEquals(0, requestCacheStats.stats().getHitCount()); assertEquals(1, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); @@ -95,10 +101,11 @@ public void testBasicOperationsCache() throws Exception { assertEquals(1, cache.count()); // cache hit - entity = new TestEntity(requestCacheStats, indexShard); + entity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(reader, 0); value = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value.streamInput().readString()); + requestCacheStats = indexShard.requestCache(); assertEquals(1, requestCacheStats.stats().getHitCount()); assertEquals(1, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); @@ -111,7 +118,7 @@ public void testBasicOperationsCache() throws Exception { if (randomBoolean()) { reader.close(); } else { - indexShard.set(false); // closed shard but reader is still open + indexShard.close("test", true, true); // closed shard but reader is still open cache.clear(entity); } cache.cleanCache(); @@ -127,9 +134,12 @@ public void testBasicOperationsCache() throws Exception { } public void testCacheDifferentReaders() throws Exception { - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); - AtomicBoolean indexShard = new AtomicBoolean(true); - ShardRequestCache requestCacheStats = new ShardRequestCache(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexShard indexShard = createIndex("test").getShard(0); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, (shardId -> { + IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + return new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id())); + })); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -146,9 +156,10 @@ public void testCacheDifferentReaders() throws Exception { DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); // initial cache - TestEntity entity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes); + ShardRequestCache requestCacheStats = entity.stats(); assertEquals("foo", value.streamInput().readString()); assertEquals(0, requestCacheStats.stats().getHitCount()); assertEquals(1, requestCacheStats.stats().getMissCount()); @@ -160,9 +171,10 @@ public void testCacheDifferentReaders() throws Exception { assertEquals(1, cache.numRegisteredCloseListeners()); // cache the second - TestEntity secondEntity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(secondReader, 0); value = cache.getOrCompute(entity, loader, secondReader, termBytes); + requestCacheStats = entity.stats(); assertEquals("bar", value.streamInput().readString()); assertEquals(0, requestCacheStats.stats().getHitCount()); assertEquals(2, requestCacheStats.stats().getMissCount()); @@ -172,9 +184,10 @@ public void testCacheDifferentReaders() throws Exception { assertTrue(requestCacheStats.stats().getMemorySize().bytesAsInt() > cacheSize + value.length()); assertEquals(2, cache.numRegisteredCloseListeners()); - secondEntity = new TestEntity(requestCacheStats, indexShard); + secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(secondReader, 0); value = cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + requestCacheStats = entity.stats(); assertEquals("bar", value.streamInput().readString()); assertEquals(1, requestCacheStats.stats().getHitCount()); assertEquals(2, requestCacheStats.stats().getMissCount()); @@ -182,10 +195,11 @@ public void testCacheDifferentReaders() throws Exception { assertTrue(loader.loadedFromCache); assertEquals(2, cache.count()); - entity = new TestEntity(requestCacheStats, indexShard); + entity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(reader, 0); value = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value.streamInput().readString()); + requestCacheStats = entity.stats(); assertEquals(2, requestCacheStats.stats().getHitCount()); assertEquals(2, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); @@ -206,7 +220,7 @@ public void testCacheDifferentReaders() throws Exception { if (randomBoolean()) { secondReader.close(); } else { - indexShard.set(false); // closed shard but reader is still open + indexShard.close("test", true, true); // closed shard but reader is still open cache.clear(secondEntity); } cache.cleanCache(); @@ -223,9 +237,11 @@ public void testCacheDifferentReaders() throws Exception { public void testEviction() throws Exception { final ByteSizeValue size; { - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); - AtomicBoolean indexShard = new AtomicBoolean(true); - ShardRequestCache requestCacheStats = new ShardRequestCache(); + IndexShard indexShard = createIndex("test").getShard(0); + IndicesRequestCache cache = new IndicesRequestCache( + Settings.EMPTY, + (shardId -> new IndicesService.IndexShardCacheEntity(indexShard)) + ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -233,27 +249,26 @@ public void testEviction() throws Exception { DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - TestEntity entity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TestEntity secondEntity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); Loader secondLoader = new Loader(secondReader, 0); BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value1.streamInput().readString()); BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes); assertEquals("bar", value2.streamInput().readString()); - size = requestCacheStats.stats().getMemorySize(); + size = indexShard.requestCache().stats().getMemorySize(); IOUtils.close(reader, secondReader, writer, dir, cache); } + IndexShard indexShard = createIndex("test1").getShard(0); IndicesRequestCache cache = new IndicesRequestCache( Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), size.getBytes() + 1 + "b").build(), - null + (shardId -> new IndicesService.IndexShardCacheEntity(indexShard)) ); - AtomicBoolean indexShard = new AtomicBoolean(true); - ShardRequestCache requestCacheStats = new ShardRequestCache(); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -261,36 +276,39 @@ public void testEviction() throws Exception { DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - TestEntity entity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TestEntity secondEntity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); Loader secondLoader = new Loader(secondReader, 0); writer.updateDocument(new Term("id", "0"), newDoc(0, "baz")); DirectoryReader thirdReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TestEntity thirddEntity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity thirddEntity = new IndicesService.IndexShardCacheEntity(indexShard); Loader thirdLoader = new Loader(thirdReader, 0); BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value1.streamInput().readString()); BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes); assertEquals("bar", value2.streamInput().readString()); - logger.info("Memory size: {}", requestCacheStats.stats().getMemorySize()); + logger.info("Memory size: {}", indexShard.requestCache().stats().getMemorySize()); BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes); assertEquals("baz", value3.streamInput().readString()); assertEquals(2, cache.count()); - assertEquals(1, requestCacheStats.stats().getEvictions()); + assertEquals(1, indexShard.requestCache().stats().getEvictions()); IOUtils.close(reader, secondReader, thirdReader, writer, dir, cache); } public void testClearAllEntityIdentity() throws Exception { - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); - AtomicBoolean indexShard = new AtomicBoolean(true); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexShard indexShard = createIndex("test").getShard(0); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, (shardId -> { + IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + return new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id())); + })); - ShardRequestCache requestCacheStats = new ShardRequestCache(); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -298,36 +316,39 @@ public void testClearAllEntityIdentity() throws Exception { DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - TestEntity entity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TestEntity secondEntity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); Loader secondLoader = new Loader(secondReader, 0); writer.updateDocument(new Term("id", "0"), newDoc(0, "baz")); DirectoryReader thirdReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - AtomicBoolean differentIdentity = new AtomicBoolean(true); - TestEntity thirddEntity = new TestEntity(requestCacheStats, differentIdentity); + IndicesService.IndexShardCacheEntity thirddEntity = new IndicesService.IndexShardCacheEntity(createIndex("test1").getShard(0)); Loader thirdLoader = new Loader(thirdReader, 0); BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value1.streamInput().readString()); BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes); assertEquals("bar", value2.streamInput().readString()); - logger.info("Memory size: {}", requestCacheStats.stats().getMemorySize()); + logger.info("Memory size: {}", indexShard.requestCache().stats().getMemorySize()); BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes); assertEquals("baz", value3.streamInput().readString()); assertEquals(3, cache.count()); - final long hitCount = requestCacheStats.stats().getHitCount(); + RequestCacheStats requestCacheStats = entity.stats().stats(); + requestCacheStats.add(thirddEntity.stats().stats()); + final long hitCount = requestCacheStats.getHitCount(); // clear all for the indexShard Idendity even though is't still open cache.clear(randomFrom(entity, secondEntity)); cache.cleanCache(); assertEquals(1, cache.count()); // third has not been validated since it's a different identity value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes); - assertEquals(hitCount + 1, requestCacheStats.stats().getHitCount()); + requestCacheStats = entity.stats().stats(); + requestCacheStats.add(thirddEntity.stats().stats()); + assertEquals(hitCount + 1, requestCacheStats.getHitCount()); assertEquals("baz", value3.streamInput().readString()); IOUtils.close(reader, secondReader, thirdReader, writer, dir, cache); @@ -371,8 +392,12 @@ public BytesReference get() { } public void testInvalidate() throws Exception { - ShardRequestCache requestCacheStats = new ShardRequestCache(); - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexShard indexShard = createIndex("test").getShard(0); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, (shardId -> { + IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + return new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id())); + })); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -380,13 +405,13 @@ public void testInvalidate() throws Exception { DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - AtomicBoolean indexShard = new AtomicBoolean(true); // initial cache - TestEntity entity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value.streamInput().readString()); + ShardRequestCache requestCacheStats = entity.stats(); assertEquals(0, requestCacheStats.stats().getHitCount()); assertEquals(1, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); @@ -394,10 +419,11 @@ public void testInvalidate() throws Exception { assertEquals(1, cache.count()); // cache hit - entity = new TestEntity(requestCacheStats, indexShard); + entity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(reader, 0); value = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value.streamInput().readString()); + requestCacheStats = entity.stats(); assertEquals(1, requestCacheStats.stats().getHitCount()); assertEquals(1, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); @@ -407,11 +433,12 @@ public void testInvalidate() throws Exception { assertEquals(1, cache.numRegisteredCloseListeners()); // load again after invalidate - entity = new TestEntity(requestCacheStats, indexShard); + entity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(reader, 0); cache.invalidate(entity, reader, termBytes); value = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value.streamInput().readString()); + requestCacheStats = entity.stats(); assertEquals(1, requestCacheStats.stats().getHitCount()); assertEquals(2, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); @@ -424,7 +451,7 @@ public void testInvalidate() throws Exception { if (randomBoolean()) { reader.close(); } else { - indexShard.set(false); // closed shard but reader is still open + indexShard.close("test", true, true); // closed shard but reader is still open cache.clear(entity); } cache.cleanCache(); @@ -439,25 +466,25 @@ public void testInvalidate() throws Exception { } public void testEqualsKey() throws IOException { - AtomicBoolean trueBoolean = new AtomicBoolean(true); - AtomicBoolean falseBoolean = new AtomicBoolean(false); IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndicesRequestCache indicesRequestCache = indicesService.indicesRequestCache; Directory dir = newDirectory(); IndexWriterConfig config = newIndexWriterConfig(); IndexWriter writer = new IndexWriter(dir, config); ShardId shardId = new ShardId("foo", "bar", 1); + ShardId shardId1 = new ShardId("foo1", "bar1", 2); IndexReader reader1 = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); String rKey1 = ((OpenSearchDirectoryReader) reader1).getDelegatingCacheHelper().getDelegatingCacheKey().getId(); writer.addDocument(new Document()); IndexReader reader2 = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); String rKey2 = ((OpenSearchDirectoryReader) reader2).getDelegatingCacheHelper().getDelegatingCacheKey().getId(); IOUtils.close(reader1, reader2, writer, dir); - IndicesRequestCache.Key key1 = indicesRequestCache.new Key(new TestEntity(null, trueBoolean), new TestBytesReference(1), rKey1); - IndicesRequestCache.Key key2 = indicesRequestCache.new Key(new TestEntity(null, trueBoolean), new TestBytesReference(1), rKey1); - IndicesRequestCache.Key key3 = indicesRequestCache.new Key(new TestEntity(null, falseBoolean), new TestBytesReference(1), rKey1); - IndicesRequestCache.Key key4 = indicesRequestCache.new Key(new TestEntity(null, trueBoolean), new TestBytesReference(1), rKey2); - IndicesRequestCache.Key key5 = indicesRequestCache.new Key(new TestEntity(null, trueBoolean), new TestBytesReference(2), rKey2); + IndexShard indexShard = mock(IndexShard.class); + when(indexShard.state()).thenReturn(IndexShardState.STARTED); + IndicesRequestCache.Key key1 = new IndicesRequestCache.Key(shardId, new TestBytesReference(1), rKey1); + IndicesRequestCache.Key key2 = new IndicesRequestCache.Key(shardId, new TestBytesReference(1), rKey1); + IndicesRequestCache.Key key3 = new IndicesRequestCache.Key(shardId1, new TestBytesReference(1), rKey1); + IndicesRequestCache.Key key4 = new IndicesRequestCache.Key(shardId, new TestBytesReference(1), rKey2); + IndicesRequestCache.Key key5 = new IndicesRequestCache.Key(shardId, new TestBytesReference(2), rKey2); String s = "Some other random object"; assertEquals(key1, key1); assertEquals(key1, key2); @@ -471,14 +498,11 @@ public void testEqualsKey() throws IOException { public void testSerializationDeserializationOfCacheKey() throws Exception { TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - ShardRequestCache shardRequestCache = new ShardRequestCache(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndicesRequestCache indicesRequestCache = indicesService.indicesRequestCache; IndexService indexService = createIndex("test"); IndexShard indexShard = indexService.getShard(0); - IndicesService.IndexShardCacheEntity shardCacheEntity = indicesService.new IndexShardCacheEntity(indexShard); + IndicesService.IndexShardCacheEntity shardCacheEntity = new IndicesService.IndexShardCacheEntity(indexShard); String readerCacheKeyId = UUID.randomUUID().toString(); - IndicesRequestCache.Key key1 = indicesRequestCache.new Key(shardCacheEntity, termBytes, readerCacheKeyId); + IndicesRequestCache.Key key1 = new IndicesRequestCache.Key(indexShard.shardId(), termBytes, readerCacheKeyId); BytesReference bytesReference = null; try (BytesStreamOutput out = new BytesStreamOutput()) { key1.writeTo(out); @@ -486,10 +510,10 @@ public void testSerializationDeserializationOfCacheKey() throws Exception { } StreamInput in = bytesReference.streamInput(); - IndicesRequestCache.Key key2 = indicesRequestCache.new Key(in); + IndicesRequestCache.Key key2 = new IndicesRequestCache.Key(in); assertEquals(readerCacheKeyId, key2.readerCacheKeyId); - assertEquals(shardCacheEntity.getCacheIdentity(), key2.entity.getCacheIdentity()); + assertEquals(((IndexShard) shardCacheEntity.getCacheIdentity()).shardId(), key2.shardId); assertEquals(termBytes, key2.value); } @@ -544,37 +568,4 @@ public boolean isFragment() { return false; } } - - private class TestEntity extends AbstractIndexShardCacheEntity { - private final AtomicBoolean standInForIndexShard; - private final ShardRequestCache shardRequestCache; - - private TestEntity(ShardRequestCache shardRequestCache, AtomicBoolean standInForIndexShard) { - this.standInForIndexShard = standInForIndexShard; - this.shardRequestCache = shardRequestCache; - } - - @Override - protected ShardRequestCache stats() { - return shardRequestCache; - } - - @Override - public boolean isOpen() { - return standInForIndexShard.get(); - } - - @Override - public Object getCacheIdentity() { - return standInForIndexShard; - } - - @Override - public long ramBytesUsed() { - return 42; - } - - @Override - public void writeTo(StreamOutput out) throws IOException {} - } } diff --git a/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java b/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java index 1290daadf8e6d..86a4e78f18af5 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java @@ -44,19 +44,15 @@ import org.apache.lucene.search.Weight; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.routing.allocation.DiskThresholdSettings; -import org.opensearch.common.cache.RemovalNotification; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.common.bytes.BytesArray; -import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexService; import org.opensearch.index.engine.Engine; import org.opensearch.index.shard.IndexShard; -import org.opensearch.indices.IndicesRequestCache.Key; import org.opensearch.indices.breaker.HierarchyCircuitBreakerService; import org.opensearch.node.MockNode; import org.opensearch.node.Node; @@ -374,20 +370,12 @@ public void testCloseWhileOngoingRequestUsesRequestCache() throws Exception { assertEquals(1, indicesService.indicesRefCount.refCount()); assertEquals(0L, cache.count()); - IndicesRequestCache.CacheEntity cacheEntity = new IndicesRequestCache.CacheEntity() { - @Override - public void writeTo(StreamOutput out) throws IOException { - - } - + IndicesService.IndexShardCacheEntity cacheEntity = new IndicesService.IndexShardCacheEntity(shard) { @Override public long ramBytesUsed() { return 42; } - @Override - public void onCached(Key key, BytesReference value) {} - @Override public boolean isOpen() { return true; @@ -397,15 +385,6 @@ public boolean isOpen() { public Object getCacheIdentity() { return this; } - - @Override - public void onHit() {} - - @Override - public void onMiss() {} - - @Override - public void onRemoval(RemovalNotification notification) {} }; cache.getOrCompute(cacheEntity, () -> new BytesArray("bar"), searcher.getDirectoryReader(), new BytesArray("foo")); assertEquals(1L, cache.count()); From bb232f2e119acc8bac1d15b319eed3ec0f6d627d Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Mon, 8 Jan 2024 11:31:24 -0800 Subject: [PATCH 12/16] Changing cacheEntity function name and removing volatile with concurrentHashMap Signed-off-by: Sagar Upadhyaya --- .../org/opensearch/indices/IndicesRequestCache.java | 10 +++++----- .../java/org/opensearch/indices/IndicesService.java | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index bba6217c504a9..48306792efcf9 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -116,7 +116,7 @@ public final class IndicesRequestCache implements RemovalListener cache; - private final Function cacheEntityFunction; + private final Function cacheEntityLookup; IndicesRequestCache(Settings settings, Function cacheEntityFunction) { this.size = INDICES_CACHE_QUERY_SIZE.get(settings); @@ -130,7 +130,7 @@ public final class IndicesRequestCache implements RemovalListener notification) { - cacheEntityFunction.apply(notification.getKey().shardId).onRemoval(notification); + cacheEntityLookup.apply(notification.getKey().shardId).onRemoval(notification); } BytesReference getOrCompute( @@ -379,10 +379,10 @@ synchronized void cleanCache() { if (!currentKeysToClean.isEmpty() || !currentFullClean.isEmpty()) { for (Iterator iterator = cache.keys().iterator(); iterator.hasNext();) { Key key = iterator.next(); - if (currentFullClean.contains(cacheEntityFunction.apply(key.shardId).getCacheIdentity())) { + if (currentFullClean.contains(cacheEntityLookup.apply(key.shardId).getCacheIdentity())) { iterator.remove(); } else { - if (currentKeysToClean.contains(new CleanupKey(cacheEntityFunction.apply(key.shardId), key.readerCacheKeyId))) { + if (currentKeysToClean.contains(new CleanupKey(cacheEntityLookup.apply(key.shardId), key.readerCacheKeyId))) { iterator.remove(); } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 635fe2a07ed2d..4b2f823014f82 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -333,7 +333,7 @@ public class IndicesService extends AbstractLifecycleComponent private final ScriptService scriptService; private final ClusterService clusterService; private final Client client; - private volatile Map indices = new ConcurrentHashMap<>(); + private Map indices = new ConcurrentHashMap<>(); private final Map> pendingDeletes = new HashMap<>(); private final AtomicInteger numUncompletedDeletes = new AtomicInteger(); private final OldShardsStats oldShardsStats = new OldShardsStats(); From 62b00582cf2a6d463253d65c59991a3d6ae9c60a Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Mon, 8 Jan 2024 14:56:05 -0800 Subject: [PATCH 13/16] Reverting to old changes for indices map Signed-off-by: Sagar Upadhyaya --- .../src/main/java/org/opensearch/indices/IndicesService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 4b2f823014f82..23ae13966eda8 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -178,7 +178,6 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -194,6 +193,7 @@ import java.util.stream.Collectors; import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; import static org.opensearch.common.collect.MapBuilder.newMapBuilder; import static org.opensearch.common.util.concurrent.OpenSearchExecutors.daemonThreadFactory; @@ -333,7 +333,7 @@ public class IndicesService extends AbstractLifecycleComponent private final ScriptService scriptService; private final ClusterService clusterService; private final Client client; - private Map indices = new ConcurrentHashMap<>(); + private volatile Map indices = emptyMap(); private final Map> pendingDeletes = new HashMap<>(); private final AtomicInteger numUncompletedDeletes = new AtomicInteger(); private final OldShardsStats oldShardsStats = new OldShardsStats(); From 5c5af1da592f86bc1ef16f903769e8c2fab90a81 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Tue, 9 Jan 2024 01:10:04 -0800 Subject: [PATCH 14/16] Fixing issue where index shard might get closed before it is removed from cache Signed-off-by: Sagar Upadhyaya --- .../index/OpenSearchDirectoryReader.java | 3 +- .../indices/IndicesRequestCache.java | 18 ++++++---- .../opensearch/indices/IndicesService.java | 9 +++-- .../indices/IndicesRequestCacheTests.java | 35 ++++++++++++++----- 4 files changed, 47 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java b/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java index ce5251dd03260..f9a87b9e74214 100644 --- a/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java +++ b/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java @@ -40,6 +40,7 @@ import org.opensearch.core.index.shard.ShardId; import java.io.IOException; +import java.util.Optional; import java.util.UUID; /** @@ -91,7 +92,7 @@ public class DelegatingCacheHelper implements CacheHelper { DelegatingCacheHelper(CacheHelper cacheHelper) { this.cacheHelper = cacheHelper; - this.serializableCacheKey = new DelegatingCacheKey(cacheHelper.getKey()); + this.serializableCacheKey = new DelegatingCacheKey(Optional.ofNullable(cacheHelper).map(key -> getKey()).orElse(null)); } @Override diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 48306792efcf9..f847a1840d0ad 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -65,6 +65,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.function.Function; @@ -116,9 +117,9 @@ public final class IndicesRequestCache implements RemovalListener cache; - private final Function cacheEntityLookup; + private final Function> cacheEntityLookup; - IndicesRequestCache(Settings settings, Function cacheEntityFunction) { + IndicesRequestCache(Settings settings, Function> cacheEntityFunction) { this.size = INDICES_CACHE_QUERY_SIZE.get(settings); this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null; long sizeInBytes = size.getBytes(); @@ -145,7 +146,9 @@ void clear(CacheEntity entity) { @Override public void onRemoval(RemovalNotification notification) { - cacheEntityLookup.apply(notification.getKey().shardId).onRemoval(notification); + // In case this event happens for an old shard, we can safely ignore this as we don't keep track for old + // shards as part of request cache. + cacheEntityLookup.apply(notification.getKey().shardId).ifPresent(entity -> entity.onRemoval(notification)); } BytesReference getOrCompute( @@ -371,7 +374,7 @@ synchronized void cleanCache() { iterator.remove(); if (cleanupKey.readerCacheKeyId == null || !cleanupKey.entity.isOpen()) { // null indicates full cleanup, as does a closed shard - currentFullClean.add(cleanupKey.entity.getCacheIdentity()); + currentFullClean.add(((IndexShard) cleanupKey.entity.getCacheIdentity()).shardId()); } else { currentKeysToClean.add(cleanupKey); } @@ -379,10 +382,13 @@ synchronized void cleanCache() { if (!currentKeysToClean.isEmpty() || !currentFullClean.isEmpty()) { for (Iterator iterator = cache.keys().iterator(); iterator.hasNext();) { Key key = iterator.next(); - if (currentFullClean.contains(cacheEntityLookup.apply(key.shardId).getCacheIdentity())) { + if (currentFullClean.contains(key.shardId)) { iterator.remove(); } else { - if (currentKeysToClean.contains(new CleanupKey(cacheEntityLookup.apply(key.shardId), key.readerCacheKeyId))) { + // If the flow comes here, then we should have a open shard available on node. + if (currentKeysToClean.contains( + new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId) + )) { iterator.remove(); } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 23ae13966eda8..2cab59b14dd8e 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -410,8 +410,13 @@ public IndicesService( this.analysisRegistry = analysisRegistry; this.indexNameExpressionResolver = indexNameExpressionResolver; this.indicesRequestCache = new IndicesRequestCache(settings, (shardId -> { - IndexService indexService = indexServiceSafe(shardId.getIndex()); - return new IndexShardCacheEntity(indexService.getShard(shardId.id())); + IndexService indexService = null; + try { + indexService = indexServiceSafe(shardId.getIndex()); + } catch (IndexNotFoundException ex) { + return Optional.empty(); + } + return Optional.of(new IndexShardCacheEntity(indexService.getShard(shardId.id()))); })); this.indicesQueryCache = new IndicesQueryCache(settings); this.mapperRegistry = mapperRegistry; diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 3ae9e55c8ad7a..73728aec12e51 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -57,6 +57,7 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.XContentHelper; +import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.IndexService; import org.opensearch.index.cache.request.RequestCacheStats; import org.opensearch.index.cache.request.ShardRequestCache; @@ -67,6 +68,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Optional; import java.util.UUID; import static org.mockito.Mockito.mock; @@ -78,7 +80,7 @@ public void testBasicOperationsCache() throws Exception { IndexShard indexShard = createIndex("test").getShard(0); IndicesRequestCache cache = new IndicesRequestCache( Settings.EMPTY, - (shardId -> new IndicesService.IndexShardCacheEntity(indexShard)) + (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))) ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -137,8 +139,13 @@ public void testCacheDifferentReaders() throws Exception { IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexShard indexShard = createIndex("test").getShard(0); IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, (shardId -> { - IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); - return new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id())); + IndexService indexService = null; + try { + indexService = indicesService.indexServiceSafe(shardId.getIndex()); + } catch (IndexNotFoundException ex) { + return Optional.empty(); + } + return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); })); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -240,7 +247,7 @@ public void testEviction() throws Exception { IndexShard indexShard = createIndex("test").getShard(0); IndicesRequestCache cache = new IndicesRequestCache( Settings.EMPTY, - (shardId -> new IndicesService.IndexShardCacheEntity(indexShard)) + (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))) ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -267,7 +274,7 @@ public void testEviction() throws Exception { IndexShard indexShard = createIndex("test1").getShard(0); IndicesRequestCache cache = new IndicesRequestCache( Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), size.getBytes() + 1 + "b").build(), - (shardId -> new IndicesService.IndexShardCacheEntity(indexShard)) + (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))) ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -305,8 +312,13 @@ public void testClearAllEntityIdentity() throws Exception { IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexShard indexShard = createIndex("test").getShard(0); IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, (shardId -> { - IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); - return new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id())); + IndexService indexService = null; + try { + indexService = indicesService.indexServiceSafe(shardId.getIndex()); + } catch (IndexNotFoundException ex) { + return Optional.empty(); + } + return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); })); Directory dir = newDirectory(); @@ -395,8 +407,13 @@ public void testInvalidate() throws Exception { IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexShard indexShard = createIndex("test").getShard(0); IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, (shardId -> { - IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); - return new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id())); + IndexService indexService = null; + try { + indexService = indicesService.indexServiceSafe(shardId.getIndex()); + } catch (IndexNotFoundException ex) { + return Optional.empty(); + } + return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); })); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); From ccde0ea1bbabf1b948876a533204b07f3c44d2ab Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Tue, 9 Jan 2024 09:05:49 -0800 Subject: [PATCH 15/16] Fixing IndicesServiceCloseTests unit test Signed-off-by: Sagar Upadhyaya --- .../java/org/opensearch/indices/IndicesServiceCloseTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java b/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java index 86a4e78f18af5..8a00cd2db21c9 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java @@ -383,7 +383,7 @@ public boolean isOpen() { @Override public Object getCacheIdentity() { - return this; + return shard; } }; cache.getOrCompute(cacheEntity, () -> new BytesArray("bar"), searcher.getDirectoryReader(), new BytesArray("foo")); From b6e2c4a3c2aefc77022d4db4698b667c27d4d6a3 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Wed, 10 Jan 2024 15:32:43 -0800 Subject: [PATCH 16/16] Changing logic to fetch Index shard from shardId Signed-off-by: Sagar Upadhyaya --- .../main/java/org/opensearch/indices/IndicesService.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 2cab59b14dd8e..db5b93f073b03 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -410,10 +410,8 @@ public IndicesService( this.analysisRegistry = analysisRegistry; this.indexNameExpressionResolver = indexNameExpressionResolver; this.indicesRequestCache = new IndicesRequestCache(settings, (shardId -> { - IndexService indexService = null; - try { - indexService = indexServiceSafe(shardId.getIndex()); - } catch (IndexNotFoundException ex) { + IndexService indexService = this.indices.get(shardId.getIndex().getUUID()); + if (indexService == null) { return Optional.empty(); } return Optional.of(new IndexShardCacheEntity(indexService.getShard(shardId.id())));