From 3ae5931001313580a4324d7bbd0119aed6672d68 Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Wed, 24 Jan 2024 13:28:18 +0100 Subject: [PATCH] Blob-cache get remove false negatives The blob cache would sometimes respond with an already closed exception even though regions could be made available. Fixed to only do so when there really are no available regions (which should never happen with region count > thread count, i.e., normally). One exception is explicit evict, which only happens on explicit clear cache or corruptions. --- .../shared/SharedBlobCacheService.java | 68 ++++++++++++++++--- .../shared/SharedBlobCacheServiceTests.java | 45 ++++++++---- 2 files changed, 92 insertions(+), 21 deletions(-) diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java index 01847a3205870..e65f8754d55f1 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java @@ -675,6 +675,7 @@ class CacheFileRegion extends EvictableRefCounted { final RegionKey regionKey; final SparseFileTracker tracker; + // io can be null when not init'ed or after evict/take volatile SharedBytes.IO io = null; CacheFileRegion(RegionKey regionKey, int regionSize) { @@ -701,6 +702,16 @@ boolean tryEvict() { return false; } + boolean tryEvictNoDecRef() { + assert Thread.holdsLock(SharedBlobCacheService.this) : "must hold lock when evicting"; + if (refCount() <= 1 && evict()) { + logger.trace("evicted and take {} with channel offset {}", regionKey, physicalStartOffset()); + evictCount.increment(); + return true; + } + + return false; + } public boolean forceEvict() { assert Thread.holdsLock(SharedBlobCacheService.this) : "must hold lock when evicting"; if (evict()) { @@ -1165,7 +1176,7 @@ public LFUCacheEntry get(KeyType cacheKey, long fileLength, int region) { // io is volatile, double locking is fine, as long as we assign it last. if (entry.chunk.io == null) { synchronized (entry.chunk) { - if (entry.chunk.io == null) { + if (entry.chunk.io == null && entry.chunk.isEvicted() == false) { return initChunk(entry); } } @@ -1226,16 +1237,15 @@ private LFUCacheEntry initChunk(LFUCacheEntry entry) { assignToSlot(entry, freeSlot); } else { // need to evict something - int frequency; + SharedBytes.IO io; synchronized (SharedBlobCacheService.this) { - frequency = maybeEvict(); + io = maybeEvictAndTake(blobCacheMetrics.getEvictedCountNonZeroFrequency()::increment); } - if (frequency > 0) { - blobCacheMetrics.getEvictedCountNonZeroFrequency().increment(); + if (io == null) { + io = freeRegions.poll(); } - final SharedBytes.IO freeSlotRetry = freeRegions.poll(); - if (freeSlotRetry != null) { - assignToSlot(entry, freeSlotRetry); + if (io != null) { + assignToSlot(entry, io); } else { boolean removed = keyMapping.remove(regionKey, entry); assert removed; @@ -1322,7 +1332,9 @@ private boolean assertChunkActiveOrEvicted(LFUCacheEntry entry) { assert entry.prev != null || entry.chunk.isEvicted(); } - assert regionOwners.get(entry.chunk.io) == entry.chunk || entry.chunk.isEvicted(); + SharedBytes.IO io = entry.chunk.io; + assert io != null || entry.chunk.isEvicted(); + assert io == null || regionOwners.get(io) == entry.chunk || entry.chunk.isEvicted(); return true; } @@ -1384,6 +1396,44 @@ private int maybeEvict() { return -1; } + private SharedBytes.IO maybeEvictAndTake(Runnable evictedNotification) { + assert Thread.holdsLock(SharedBlobCacheService.this); + for (int currentFreq = 0; currentFreq < maxFreq; currentFreq++) { + // recheck this per freq in case we raced an eviction with an incref'er. + SharedBytes.IO freeRegion = freeRegions.poll(); + if (freeRegion != null) { + return freeRegion; + } + for (LFUCacheEntry entry = freqs[currentFreq]; entry != null; entry = entry.next) { + boolean evicted = entry.chunk.tryEvictNoDecRef(); + if (evicted) { + try { + if (entry.chunk.io != null) { + try { + if (entry.chunk.refCount() == 1) { + // grab io, rely on incref'ers also checking evicted field. + final SharedBytes.IO result = entry.chunk.io; + entry.chunk.io = null; + assert regionOwners.remove(result) == entry.chunk; + return result; + } + } finally { + unlink(entry); + keyMapping.remove(entry.chunk.regionKey, entry); + } + } + } finally { + entry.chunk.decRef(); + if (currentFreq > 0) { + evictedNotification.run(); + } + } + } + } + } + // give up + return null; + } /** * This method tries to evict the least used {@link LFUCacheEntry}. Only entries with the lowest possible frequency are considered * for eviction. diff --git a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java index fa58ab58ac95c..47169e9669c4e 100644 --- a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java +++ b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java @@ -90,11 +90,11 @@ public void testBasicEviction() throws IOException { assertEquals(2, cacheService.freeRegionCount()); synchronized (cacheService) { - assertTrue(region1.tryEvict()); + assertTrue(tryEvict(region1)); } assertEquals(3, cacheService.freeRegionCount()); synchronized (cacheService) { - assertFalse(region1.tryEvict()); + assertFalse(tryEvict(region1)); } assertEquals(3, cacheService.freeRegionCount()); final var bytesReadFuture = new PlainActionFuture(); @@ -107,17 +107,17 @@ public void testBasicEviction() throws IOException { bytesReadFuture ); synchronized (cacheService) { - assertFalse(region0.tryEvict()); + assertFalse(tryEvict(region0)); } assertEquals(3, cacheService.freeRegionCount()); assertFalse(bytesReadFuture.isDone()); taskQueue.runAllRunnableTasks(); synchronized (cacheService) { - assertTrue(region0.tryEvict()); + assertTrue(tryEvict(region0)); } assertEquals(4, cacheService.freeRegionCount()); synchronized (cacheService) { - assertTrue(region2.tryEvict()); + assertTrue(tryEvict(region2)); } assertEquals(5, cacheService.freeRegionCount()); assertTrue(bytesReadFuture.isDone()); @@ -125,6 +125,18 @@ public void testBasicEviction() throws IOException { } } + private static boolean tryEvict(SharedBlobCacheService.CacheFileRegion region1) { + if (randomBoolean()) { + return region1.tryEvict(); + } else { + boolean result = region1.tryEvictNoDecRef(); + if (result) { + region1.decRef(); + } + return result; + } + } + public void testAutoEviction() throws IOException { Settings settings = Settings.builder() .put(NODE_NAME_SETTING.getKey(), "node") @@ -163,7 +175,7 @@ public void testAutoEviction() throws IOException { // explicitly evict region 1 synchronized (cacheService) { - assertTrue(region1.tryEvict()); + assertTrue(tryEvict(region1)); } assertEquals(1, cacheService.freeRegionCount()); } @@ -308,11 +320,14 @@ public void testDecay() throws IOException { */ public void testGetMultiThreaded() throws IOException { int threads = between(2, 10); + int regionCount = between(1, 20); + // if we have enough regions, a get should always have a result (except for explicit evict interference) + final boolean allowAlreadyClosed = regionCount < threads; Settings settings = Settings.builder() .put(NODE_NAME_SETTING.getKey(), "node") .put( SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), - ByteSizeValue.ofBytes(size(between(1, 20) * 100L)).getStringRep() + ByteSizeValue.ofBytes(size(regionCount * 100L)).getStringRep() ) .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(100)).getStringRep()) .put(SharedBlobCacheService.SHARED_CACHE_MIN_TIME_DELTA_SETTING.getKey(), randomFrom("0", "1ms", "10s")) @@ -343,11 +358,17 @@ public void testGetMultiThreaded() throws IOException { ready.await(); for (int i = 0; i < iterations; ++i) { try { - SharedBlobCacheService.CacheFileRegion cacheFileRegion = cacheService.get( - cacheKeys[i], - fileLength, - regions[i] - ); + SharedBlobCacheService.CacheFileRegion cacheFileRegion; + try { + cacheFileRegion = cacheService.get( + cacheKeys[i], + fileLength, + regions[i] + ); + } catch (AlreadyClosedException e) { + assert allowAlreadyClosed || e.getMessage().equals("evicted during free region allocation"): e; + throw e; + } if (cacheFileRegion.tryIncRef()) { if (yield[i] == 0) { Thread.yield();