Skip to content

Commit

Permalink
[Tiered Caching] [Bug Fix] Use concurrentMap instead of HashMap to fi…
Browse files Browse the repository at this point in the history
…x Concurrent Modification Exception (#14221)

* use concurrentmap

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCacheTests.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCacheTests.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update CHANGELOG.md

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCache.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCacheTests.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCacheTests.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* revert feature flags

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* changelog to releaselog

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* use concurrentmap

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCacheTests.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCacheTests.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update CHANGELOG.md

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCache.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCacheTests.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCacheTests.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* revert feature flags

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* changelog to releaselog

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* revert the test removal

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* revert the conflict resolutions

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

---------

Signed-off-by: Kiran Prakash <awskiran@amazon.com>
  • Loading branch information
kiranprakash154 authored Jun 13, 2024
1 parent 435af89 commit ccf5289
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 11 deletions.
3 changes: 2 additions & 1 deletion release-notes/opensearch.release-notes-2.15.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,5 @@
- Fix double invocation of postCollection when MultiBucketCollector is present ([#14015](https://github.com/opensearch-project/OpenSearch/pull/14015))
- Fix ReplicaShardBatchAllocator to batch shards without duplicates ([#13710](https://github.com/opensearch-project/OpenSearch/pull/13710))
- Java high-level REST client bulk() is not respecting the bulkRequest.requireAlias(true) method call ([#14146](https://github.com/opensearch-project/OpenSearch/pull/14146))
- Fix ShardNotFoundException during request cache clean up ([#14219](https://github.com/opensearch-project/OpenSearch/pull/14219))
- Fix ShardNotFoundException during request cache clean up ([#14219](https://github.com/opensearch-project/OpenSearch/pull/14219))
- Fix Concurrent Modification Exception in Indices Request Cache([#14032](https://github.com/opensearch-project/OpenSearch/pull/14221))
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -507,7 +506,7 @@ public int hashCode() {
* */
class IndicesRequestCacheCleanupManager implements Closeable {
private final Set<CleanupKey> keysToClean;
private final ConcurrentMap<ShardId, HashMap<String, Integer>> cleanupKeyToCountMap;
private final ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap;
private final AtomicInteger staleKeysCount;
private volatile double stalenessThreshold;
private final IndicesRequestCacheCleaner cacheCleaner;
Expand Down Expand Up @@ -568,7 +567,13 @@ private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) {

// If the key doesn't exist, it's added with a value of 1.
// If the key exists, its value is incremented by 1.
cleanupKeyToCountMap.computeIfAbsent(shardId, k -> new HashMap<>()).merge(cleanupKey.readerCacheKeyId, 1, Integer::sum);
addToCleanupKeyToCountMap(shardId, cleanupKey.readerCacheKeyId);
}

// pkg-private for testing
void addToCleanupKeyToCountMap(ShardId shardId, String readerCacheKeyId) {
cleanupKeyToCountMap.computeIfAbsent(shardId, k -> ConcurrentCollections.newConcurrentMap())
.merge(readerCacheKeyId, 1, Integer::sum);
}

/**
Expand Down Expand Up @@ -826,7 +831,7 @@ public void close() {
}

// for testing
ConcurrentMap<ShardId, HashMap<String, Integer>> getCleanupKeyToCountMap() {
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> getCleanupKeyToCountMap() {
return cleanupKeyToCountMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -105,7 +105,9 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Collections.emptyMap;
Expand Down Expand Up @@ -489,7 +491,7 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount(
indexShard.hashCode()
);
// test the mapping
ConcurrentMap<ShardId, HashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
// shard id should exist
assertTrue(cleanupKeyToCountMap.containsKey(shardId));
// reader CacheKeyId should NOT exist
Expand Down Expand Up @@ -552,7 +554,7 @@ public void testStaleCount_OnRemovalNotificationOfNonStaleKey_DoesNotDecrementsS
);

// test the mapping
ConcurrentMap<ShardId, HashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
// shard id should exist
assertTrue(cleanupKeyToCountMap.containsKey(shardId));
// reader CacheKeyId should NOT exist
Expand Down Expand Up @@ -720,7 +722,7 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception {
cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes());
assertEquals(1, cache.count());
// test the mappings
ConcurrentMap<ShardId, HashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(reader)));

cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes());
Expand Down Expand Up @@ -793,8 +795,54 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception {
IOUtils.close(secondReader);
}

private DirectoryReader getReader(IndexWriter writer, ShardId shardId) throws IOException {
return OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId);
// test adding to cleanupKeyToCountMap with multiple threads
public void testAddToCleanupKeyToCountMap() throws Exception {
threadPool = getThreadPool();
Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build();
cache = getIndicesRequestCache(settings);

int numberOfThreads = 10;
int numberOfIterations = 1000;
Phaser phaser = new Phaser(numberOfThreads + 1); // +1 for the main thread
AtomicBoolean exceptionDetected = new AtomicBoolean(false);

ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);

for (int i = 0; i < numberOfThreads; i++) {
executorService.submit(() -> {
phaser.arriveAndAwaitAdvance(); // Ensure all threads start at the same time
try {
for (int j = 0; j < numberOfIterations; j++) {
cache.cacheCleanupManager.addToCleanupKeyToCountMap(indexShard.shardId(), UUID.randomUUID().toString());
}
} catch (ConcurrentModificationException e) {
logger.error("ConcurrentModificationException detected in thread : " + e.getMessage());
exceptionDetected.set(true); // Set flag if exception is detected
}
});
}
phaser.arriveAndAwaitAdvance(); // Start all threads

// Main thread iterates over the map
executorService.submit(() -> {
try {
for (int j = 0; j < numberOfIterations; j++) {
cache.cacheCleanupManager.getCleanupKeyToCountMap().forEach((k, v) -> {
v.forEach((k1, v1) -> {
// Accessing the map to create contention
v.get(k1);
});
});
}
} catch (ConcurrentModificationException e) {
logger.error("ConcurrentModificationException detected in main thread : " + e.getMessage());
exceptionDetected.set(true); // Set flag if exception is detected
}
});

executorService.shutdown();
executorService.awaitTermination(60, TimeUnit.SECONDS);
assertFalse(exceptionDetected.get());
}

private IndicesRequestCache getIndicesRequestCache(Settings settings) {
Expand All @@ -808,6 +856,10 @@ private IndicesRequestCache getIndicesRequestCache(Settings settings) {
);
}

private DirectoryReader getReader(IndexWriter writer, ShardId shardId) throws IOException {
return OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId);
}

private Loader getLoader(DirectoryReader reader) {
return new Loader(reader, 0);
}
Expand Down

0 comments on commit ccf5289

Please sign in to comment.