diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java index 01847a3205870..f89285ae61cc0 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java @@ -26,7 +26,6 @@ import org.elasticsearch.common.settings.SettingsException; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.RelativeByteSizeValue; -import org.elasticsearch.common.util.concurrent.AbstractAsyncTask; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.Assertions; @@ -50,7 +49,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -58,9 +56,9 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; import java.util.function.IntConsumer; -import java.util.function.LongSupplier; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -262,6 +260,18 @@ void computeDecay() { } } + // used in tests + void maybeScheduleDecayAndNewEpoch() { + if (cache instanceof LFUCache lfuCache) { + lfuCache.maybeScheduleDecayAndNewEpoch(lfuCache.epoch.get()); + } + } + + // used in tests + long epoch() { + return ((LFUCache) cache).epoch.get(); + } + private interface Cache extends Releasable { CacheEntry get(K cacheKey, long fileLength, int region); @@ -311,7 +321,7 @@ private CacheEntry(T chunk) { private final BlobCacheMetrics blobCacheMetrics; - private final LongSupplier relativeTimeInMillisSupplier; + private final Runnable evictIncrementer; public SharedBlobCacheService( NodeEnvironment environment, @@ -320,7 +330,7 @@ public SharedBlobCacheService( String ioExecutor, BlobCacheMetrics blobCacheMetrics ) { - this(environment, settings, threadPool, ioExecutor, ioExecutor, blobCacheMetrics, threadPool::relativeTimeInMillis); + this(environment, settings, threadPool, ioExecutor, ioExecutor, blobCacheMetrics); } public SharedBlobCacheService( @@ -329,8 +339,7 @@ public SharedBlobCacheService( ThreadPool threadPool, String ioExecutor, String bulkExecutor, - BlobCacheMetrics blobCacheMetrics, - LongSupplier relativeTimeInMillisSupplier + BlobCacheMetrics blobCacheMetrics ) { this.threadPool = threadPool; this.ioExecutor = threadPool.executor(ioExecutor); @@ -372,7 +381,7 @@ public SharedBlobCacheService( this.recoveryRangeSize = BlobCacheUtils.toIntBytes(SHARED_CACHE_RECOVERY_RANGE_SIZE_SETTING.get(settings).getBytes()); this.blobCacheMetrics = blobCacheMetrics; - this.relativeTimeInMillisSupplier = relativeTimeInMillisSupplier; + this.evictIncrementer = blobCacheMetrics.getEvictedCountNonZeroFrequency()::increment; } public static long calculateCacheSize(Settings settings, long totalFsSize) { @@ -612,10 +621,6 @@ int getFreq(CacheFileRegion cacheFileRegion) { return -1; } - private long relativeTimeInMillis() { - return relativeTimeInMillisSupplier.getAsLong(); - } - @Override public void close() { sharedBytes.decRef(); @@ -671,10 +676,16 @@ public final boolean isEvicted() { } } + /** + * While this class has incRef and tryIncRef methods, incRefEnsureOpen and tryIncrefEnsureOpen should + * always be used, ensuring the right ordering between incRef/tryIncRef and ensureOpen + * (see {@link LFUCache#maybeEvictAndTakeForFrequency(Runnable, int)}) + */ class CacheFileRegion extends EvictableRefCounted { final RegionKey regionKey; final SparseFileTracker tracker; + // io can be null when not init'ed or after evict/take volatile SharedBytes.IO io = null; CacheFileRegion(RegionKey regionKey, int regionSize) { @@ -688,6 +699,27 @@ public long physicalStartOffset() { return ioRef == null ? -1L : (long) regionKey.region * regionSize; } + public boolean tryIncRefEnsureOpen() { + if (tryIncRef()) { + ensureOpenOrDecRef(); + return true; + } + + return false; + } + + public void incRefEnsureOpen() { + incRef(); + ensureOpenOrDecRef(); + } + + private void ensureOpenOrDecRef() { + if (isEvicted()) { + decRef(); + throwAlreadyEvicted(); + } + } + // tries to evict this chunk if noone is holding onto its resources anymore // visible for tests. boolean tryEvict() { @@ -701,6 +733,17 @@ boolean tryEvict() { return false; } + boolean tryEvictNoDecRef() { + assert Thread.holdsLock(SharedBlobCacheService.this) : "must hold lock when evicting"; + if (refCount() <= 1 && evict()) { + logger.trace("evicted and take {} with channel offset {}", regionKey, physicalStartOffset()); + evictCount.increment(); + return true; + } + + return false; + } + public boolean forceEvict() { assert Thread.holdsLock(SharedBlobCacheService.this) : "must hold lock when evicting"; if (evict()) { @@ -723,23 +766,27 @@ protected void closeInternal() { logger.trace("closed {} with channel offset {}", regionKey, physicalStartOffset()); } - private void ensureOpen() { - if (isEvicted()) { - throwAlreadyEvicted(); - } - } - private static void throwAlreadyEvicted() { throwAlreadyClosed("File chunk is evicted"); } + /** + * Optimistically try to read from the region + * @return true if successful, i.e., not evicted and data available, false if evicted + */ boolean tryRead(ByteBuffer buf, long offset) throws IOException { - int readBytes = io.read(buf, getRegionRelativePosition(offset)); - if (isEvicted()) { - buf.position(buf.position() - readBytes); + SharedBytes.IO ioRef = this.io; + if (ioRef != null) { + int readBytes = ioRef.read(buf, getRegionRelativePosition(offset)); + if (isEvicted()) { + buf.position(buf.position() - readBytes); + return false; + } + return true; + } else { + // taken by someone else return false; } - return true; } /** @@ -761,9 +808,8 @@ void populate( ) { Releasable resource = null; try { - incRef(); + incRefEnsureOpen(); resource = Releasables.releaseOnce(this::decRef); - ensureOpen(); final List gaps = tracker.waitForRange( rangeToWrite, rangeToWrite, @@ -796,9 +842,8 @@ void populateAndRead( ) { Releasable resource = null; try { - incRef(); + incRefEnsureOpen(); resource = Releasables.releaseOnce(this::decRef); - ensureOpen(); final List gaps = tracker.waitForRange( rangeToWrite, rangeToRead, @@ -835,8 +880,7 @@ private AbstractRunnable fillGapRunnable(CacheFileRegion cacheFileRegion, RangeM return new AbstractRunnable() { @Override protected void doRun() throws Exception { - ensureOpen(); - if (cacheFileRegion.tryIncRef() == false) { + if (cacheFileRegion.tryIncRefEnsureOpen() == false) { throw new AlreadyClosedException("File chunk [" + cacheFileRegion.regionKey + "] has been released"); } try { @@ -1064,6 +1108,8 @@ private RangeAvailableHandler readerWithOffset(RangeAvailableHandler reader, Cac } private boolean assertValidRegionAndLength(CacheFileRegion fileRegion, int channelPos, int len) { + assert fileRegion.io != null; + assert fileRegion.hasReferences(); assert regionOwners.get(fileRegion.io) == fileRegion; assert channelPos >= 0 && channelPos + len <= regionSize; return true; @@ -1111,17 +1157,20 @@ class LFUCacheEntry extends CacheEntry { LFUCacheEntry prev; LFUCacheEntry next; int freq; - volatile long lastAccessed; + volatile long lastAccessedEpoch; LFUCacheEntry(CacheFileRegion chunk, long lastAccessed) { super(chunk); - this.lastAccessed = lastAccessed; + this.lastAccessedEpoch = lastAccessed; + // todo: consider whether freq=1 is still right for new entries. + // it could risk decaying to level 0 right after and thus potentially be evicted + // if the freq 1 LRU chain was short. this.freq = 1; } void touch() { - long now = relativeTimeInMillis(); - if (now - lastAccessed >= minTimeDelta) { + long now = epoch.get(); + if (now > lastAccessedEpoch) { maybePromote(now, this); } } @@ -1130,21 +1179,20 @@ void touch() { private final ConcurrentHashMap, LFUCacheEntry> keyMapping = new ConcurrentHashMap<>(); private final LFUCacheEntry[] freqs; private final int maxFreq; - private final long minTimeDelta; - private final CacheDecayTask decayTask; + private final DecayAndNewEpochTask decayAndNewEpochTask; + + private final AtomicLong epoch = new AtomicLong(); @SuppressWarnings("unchecked") LFUCache(Settings settings) { this.maxFreq = SHARED_CACHE_MAX_FREQ_SETTING.get(settings); - this.minTimeDelta = SHARED_CACHE_MIN_TIME_DELTA_SETTING.get(settings).millis(); freqs = (LFUCacheEntry[]) Array.newInstance(LFUCacheEntry.class, maxFreq); - decayTask = new CacheDecayTask(threadPool, threadPool.generic(), SHARED_CACHE_DECAY_INTERVAL_SETTING.get(settings)); - decayTask.rescheduleIfNecessary(); + decayAndNewEpochTask = new DecayAndNewEpochTask(threadPool.generic()); } @Override public void close() { - decayTask.close(); + decayAndNewEpochTask.close(); } int getFreq(CacheFileRegion cacheFileRegion) { @@ -1154,7 +1202,7 @@ int getFreq(CacheFileRegion cacheFileRegion) { @Override public LFUCacheEntry get(KeyType cacheKey, long fileLength, int region) { final RegionKey regionKey = new RegionKey<>(cacheKey, region); - final long now = relativeTimeInMillis(); + final long now = epoch.get(); // try to just get from the map on the fast-path to save instantiating the capturing lambda needed on the slow path // if we did not find an entry var entry = keyMapping.get(regionKey); @@ -1165,7 +1213,7 @@ public LFUCacheEntry get(KeyType cacheKey, long fileLength, int region) { // io is volatile, double locking is fine, as long as we assign it last. if (entry.chunk.io == null) { synchronized (entry.chunk) { - if (entry.chunk.io == null) { + if (entry.chunk.io == null && entry.chunk.isEvicted() == false) { return initChunk(entry); } } @@ -1173,7 +1221,7 @@ public LFUCacheEntry get(KeyType cacheKey, long fileLength, int region) { assert assertChunkActiveOrEvicted(entry); // existing item, check if we need to promote item - if (now - entry.lastAccessed >= minTimeDelta) { + if (now > entry.lastAccessedEpoch) { maybePromote(now, entry); } @@ -1226,16 +1274,15 @@ private LFUCacheEntry initChunk(LFUCacheEntry entry) { assignToSlot(entry, freeSlot); } else { // need to evict something - int frequency; + SharedBytes.IO io; synchronized (SharedBlobCacheService.this) { - frequency = maybeEvict(); + io = maybeEvictAndTake(evictIncrementer); } - if (frequency > 0) { - blobCacheMetrics.getEvictedCountNonZeroFrequency().increment(); + if (io == null) { + io = freeRegions.poll(); } - final SharedBytes.IO freeSlotRetry = freeRegions.poll(); - if (freeSlotRetry != null) { - assignToSlot(entry, freeSlotRetry); + if (io != null) { + assignToSlot(entry, io); } else { boolean removed = keyMapping.remove(regionKey, entry); assert removed; @@ -1322,16 +1369,19 @@ private boolean assertChunkActiveOrEvicted(LFUCacheEntry entry) { assert entry.prev != null || entry.chunk.isEvicted(); } - assert regionOwners.get(entry.chunk.io) == entry.chunk || entry.chunk.isEvicted(); + SharedBytes.IO io = entry.chunk.io; + assert io != null || entry.chunk.isEvicted(); + assert io == null || regionOwners.get(io) == entry.chunk || entry.chunk.isEvicted(); return true; } - private void maybePromote(long now, LFUCacheEntry entry) { + private void maybePromote(long epoch, LFUCacheEntry entry) { synchronized (SharedBlobCacheService.this) { - if (now - entry.lastAccessed >= minTimeDelta && entry.freq + 1 < maxFreq && entry.chunk.isEvicted() == false) { + if (epoch > entry.lastAccessedEpoch && entry.freq < maxFreq - 1 && entry.chunk.isEvicted() == false) { unlink(entry); - entry.freq++; - entry.lastAccessed = now; + // go 2 up per epoch, allowing us to decay 1 every epoch. + entry.freq = Math.min(entry.freq + 2, maxFreq - 1); + entry.lastAccessedEpoch = epoch; pushEntryToBack(entry); } } @@ -1363,25 +1413,118 @@ private void unlink(final LFUCacheEntry entry) { assert invariant(entry, false); } + private void appendLevel1ToLevel0() { + assert Thread.holdsLock(SharedBlobCacheService.this); + var front0 = freqs[0]; + var front1 = freqs[1]; + if (front0 == null) { + freqs[0] = front1; + freqs[1] = null; + decrementFreqList(front1); + assert front1 == null || invariant(front1, true); + } else if (front1 != null) { + var back0 = front0.prev; + var back1 = front1.prev; + assert invariant(front0, true); + assert invariant(front1, true); + assert invariant(back0, true); + assert invariant(back1, true); + + decrementFreqList(front1); + + front0.prev = back1; + back0.next = front1; + front1.prev = back0; + assert back1.next == null; + + freqs[1] = null; + + assert invariant(front0, true); + assert invariant(front1, true); + assert invariant(back0, true); + assert invariant(back1, true); + } + } + + private void decrementFreqList(LFUCacheEntry entry) { + while (entry != null) { + entry.freq--; + entry = entry.next; + } + } + /** * Cycles through the {@link LFUCacheEntry} from 0 to max frequency and - * tries to evict a chunk if no one is holding onto its resources anymore + * tries to evict a chunk if no one is holding onto its resources anymore. * - * @return the frequency of the evicted entry as integer or -1 if no entry was evicted from cache + * Also regularly polls for free regions and thus might steal one in case any become available. + * + * @return a now free IO region or null if none available. */ - private int maybeEvict() { + private SharedBytes.IO maybeEvictAndTake(Runnable evictedNotification) { assert Thread.holdsLock(SharedBlobCacheService.this); - for (int currentFreq = 0; currentFreq < maxFreq; currentFreq++) { - for (LFUCacheEntry entry = freqs[currentFreq]; entry != null; entry = entry.next) { - boolean evicted = entry.chunk.tryEvict(); - if (evicted && entry.chunk.io != null) { - unlink(entry); - keyMapping.remove(entry.chunk.regionKey, entry); - return currentFreq; + long currentEpoch = epoch.get(); // must be captured before attempting to evict a freq 0 + SharedBytes.IO freq0 = maybeEvictAndTakeForFrequency(evictedNotification, 0); + if (freqs[0] == null) { + // no frequency 0 entries, let us switch epoch and decay so we get some for next time. + maybeScheduleDecayAndNewEpoch(currentEpoch); + } + if (freq0 != null) { + return freq0; + } + for (int currentFreq = 1; currentFreq < maxFreq; currentFreq++) { + // recheck this per freq in case we raced an eviction with an incref'er. + SharedBytes.IO freeRegion = freeRegions.poll(); + if (freeRegion != null) { + return freeRegion; + } + SharedBytes.IO taken = maybeEvictAndTakeForFrequency(evictedNotification, currentFreq); + if (taken != null) { + return taken; + } + } + // give up + return null; + } + + private SharedBytes.IO maybeEvictAndTakeForFrequency(Runnable evictedNotification, int currentFreq) { + for (LFUCacheEntry entry = freqs[currentFreq]; entry != null; entry = entry.next) { + boolean evicted = entry.chunk.tryEvictNoDecRef(); + if (evicted) { + try { + SharedBytes.IO ioRef = entry.chunk.io; + if (ioRef != null) { + try { + if (entry.chunk.refCount() == 1) { + // we own that one refcount (since we CAS'ed evicted to 1) + // grab io, rely on incref'ers also checking evicted field. + entry.chunk.io = null; + assert regionOwners.remove(ioRef) == entry.chunk; + return ioRef; + } + } finally { + unlink(entry); + keyMapping.remove(entry.chunk.regionKey, entry); + } + } + } finally { + entry.chunk.decRef(); + if (currentFreq > 0) { + evictedNotification.run(); + } } } } - return -1; + return null; + } + + /** + * Check if a new epoch is needed based on the input. The input epoch should be captured + * before the determination that a new epoch is needed is done. + * @param currentEpoch the epoch to check against if a new epoch is needed + */ + private void maybeScheduleDecayAndNewEpoch(long currentEpoch) { + decayAndNewEpochTask.spawnIfNotRunning(currentEpoch); } /** @@ -1406,39 +1549,66 @@ public boolean maybeEvictLeastUsed() { private void computeDecay() { synchronized (SharedBlobCacheService.this) { - long now = relativeTimeInMillis(); - for (int i = 0; i < maxFreq; i++) { - for (LFUCacheEntry entry = freqs[i]; entry != null; entry = entry.next) { - if (entry.freq > 0 && now - entry.lastAccessed >= 2 * minTimeDelta) { - unlink(entry); - entry.freq--; - pushEntryToBack(entry); - } - } + appendLevel1ToLevel0(); + for (int i = 2; i < maxFreq; i++) { + assert freqs[i - 1] == null; + freqs[i - 1] = freqs[i]; + freqs[i] = null; + decrementFreqList(freqs[i - 1]); + assert freqs[i - 1] == null || invariant(freqs[i - 1], true); } } } - class CacheDecayTask extends AbstractAsyncTask { + class DecayAndNewEpochTask extends AbstractRunnable { - CacheDecayTask(ThreadPool threadPool, Executor executor, TimeValue interval) { - super(logger, Objects.requireNonNull(threadPool), executor, Objects.requireNonNull(interval), true); + private final Executor executor; + private final AtomicLong pendingEpoch = new AtomicLong(); + private volatile boolean isClosed; + + DecayAndNewEpochTask(Executor executor) { + this.executor = executor; } @Override - protected boolean mustReschedule() { - return true; + protected void doRun() throws Exception { + if (isClosed == false) { + computeDecay(); + } } @Override - public void runInternal() { - computeDecay(); + public void onFailure(Exception e) { + logger.error("failed to run cache decay task", e); + } + + @Override + public void onAfter() { + assert pendingEpoch.get() == epoch.get() + 1; + epoch.incrementAndGet(); + } + + @Override + public void onRejection(Exception e) { + assert false : e; + logger.error("unexpected rejection", e); + epoch.incrementAndGet(); } @Override public String toString() { return "shared_cache_decay_task"; } + + public void spawnIfNotRunning(long currentEpoch) { + if (isClosed == false && pendingEpoch.compareAndSet(currentEpoch, currentEpoch + 1)) { + executor.execute(this); + } + } + + public void close() { + this.isClosed = true; + } } } } diff --git a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java index fa58ab58ac95c..21fc9b9dbf9c2 100644 --- a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java +++ b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java @@ -34,8 +34,10 @@ import java.io.IOException; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.BrokenBarrierException; @@ -90,11 +92,11 @@ public void testBasicEviction() throws IOException { assertEquals(2, cacheService.freeRegionCount()); synchronized (cacheService) { - assertTrue(region1.tryEvict()); + assertTrue(tryEvict(region1)); } assertEquals(3, cacheService.freeRegionCount()); synchronized (cacheService) { - assertFalse(region1.tryEvict()); + assertFalse(tryEvict(region1)); } assertEquals(3, cacheService.freeRegionCount()); final var bytesReadFuture = new PlainActionFuture(); @@ -107,17 +109,17 @@ public void testBasicEviction() throws IOException { bytesReadFuture ); synchronized (cacheService) { - assertFalse(region0.tryEvict()); + assertFalse(tryEvict(region0)); } assertEquals(3, cacheService.freeRegionCount()); assertFalse(bytesReadFuture.isDone()); taskQueue.runAllRunnableTasks(); synchronized (cacheService) { - assertTrue(region0.tryEvict()); + assertTrue(tryEvict(region0)); } assertEquals(4, cacheService.freeRegionCount()); synchronized (cacheService) { - assertTrue(region2.tryEvict()); + assertTrue(tryEvict(region2)); } assertEquals(5, cacheService.freeRegionCount()); assertTrue(bytesReadFuture.isDone()); @@ -125,6 +127,18 @@ public void testBasicEviction() throws IOException { } } + private static boolean tryEvict(SharedBlobCacheService.CacheFileRegion region1) { + if (randomBoolean()) { + return region1.tryEvict(); + } else { + boolean result = region1.tryEvictNoDecRef(); + if (result) { + region1.decRef(); + } + return result; + } + } + public void testAutoEviction() throws IOException { Settings settings = Settings.builder() .put(NODE_NAME_SETTING.getKey(), "node") @@ -163,7 +177,7 @@ public void testAutoEviction() throws IOException { // explicitly evict region 1 synchronized (cacheService) { - assertTrue(region1.tryEvict()); + assertTrue(tryEvict(region1)); } assertEquals(1, cacheService.freeRegionCount()); } @@ -237,9 +251,10 @@ public void testForceEvictResponse() throws IOException { } public void testDecay() throws IOException { + // we have 8 regions Settings settings = Settings.builder() .put(NODE_NAME_SETTING.getKey(), "node") - .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(500)).getStringRep()) + .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(400)).getStringRep()) .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(100)).getStringRep()) .put("path.home", createTempDir()) .build(); @@ -254,51 +269,88 @@ public void testDecay() throws IOException { BlobCacheMetrics.NOOP ) ) { + assertEquals(4, cacheService.freeRegionCount()); + final var cacheKey1 = generateCacheKey(); final var cacheKey2 = generateCacheKey(); - assertEquals(5, cacheService.freeRegionCount()); + final var cacheKey3 = generateCacheKey(); + // add a region that we can evict when provoking first decay + cacheService.get("evictkey", size(250), 0); + assertEquals(3, cacheService.freeRegionCount()); final var region0 = cacheService.get(cacheKey1, size(250), 0); - assertEquals(4, cacheService.freeRegionCount()); + assertEquals(2, cacheService.freeRegionCount()); final var region1 = cacheService.get(cacheKey2, size(250), 1); - assertEquals(3, cacheService.freeRegionCount()); + assertEquals(1, cacheService.freeRegionCount()); + final var region2 = cacheService.get(cacheKey3, size(250), 1); + assertEquals(0, cacheService.freeRegionCount()); assertEquals(1, cacheService.getFreq(region0)); assertEquals(1, cacheService.getFreq(region1)); + assertEquals(1, cacheService.getFreq(region2)); + AtomicLong expectedEpoch = new AtomicLong(); + Runnable triggerDecay = () -> { + assertThat(taskQueue.hasRunnableTasks(), is(false)); + cacheService.get(expectedEpoch.toString(), size(250), 0); + assertThat(taskQueue.hasRunnableTasks(), is(true)); + taskQueue.runAllRunnableTasks(); + assertThat(cacheService.epoch(), equalTo(expectedEpoch.incrementAndGet())); + }; + + triggerDecay.run(); - taskQueue.advanceTime(); - taskQueue.runAllRunnableTasks(); + cacheService.get(cacheKey1, size(250), 0); + cacheService.get(cacheKey2, size(250), 1); + cacheService.get(cacheKey3, size(250), 1); + + triggerDecay.run(); final var region0Again = cacheService.get(cacheKey1, size(250), 0); assertSame(region0Again, region0); - assertEquals(2, cacheService.getFreq(region0)); + assertEquals(3, cacheService.getFreq(region0)); assertEquals(1, cacheService.getFreq(region1)); + assertEquals(1, cacheService.getFreq(region2)); + + triggerDecay.run(); - taskQueue.advanceTime(); - taskQueue.runAllRunnableTasks(); cacheService.get(cacheKey1, size(250), 0); - assertEquals(3, cacheService.getFreq(region0)); + assertEquals(4, cacheService.getFreq(region0)); cacheService.get(cacheKey1, size(250), 0); + assertEquals(4, cacheService.getFreq(region0)); + assertEquals(0, cacheService.getFreq(region1)); + assertEquals(0, cacheService.getFreq(region2)); + + // ensure no freq=0 entries + cacheService.get(cacheKey2, size(250), 1); + cacheService.get(cacheKey3, size(250), 1); + assertEquals(2, cacheService.getFreq(region1)); + assertEquals(2, cacheService.getFreq(region2)); + + triggerDecay.run(); + assertEquals(3, cacheService.getFreq(region0)); + assertEquals(1, cacheService.getFreq(region1)); + assertEquals(1, cacheService.getFreq(region2)); - // advance 2 ticks (decay only starts after 2 ticks) - taskQueue.advanceTime(); - taskQueue.runAllRunnableTasks(); - taskQueue.advanceTime(); - taskQueue.runAllRunnableTasks(); + triggerDecay.run(); assertEquals(2, cacheService.getFreq(region0)); assertEquals(0, cacheService.getFreq(region1)); + assertEquals(0, cacheService.getFreq(region2)); - // advance another tick - taskQueue.advanceTime(); - taskQueue.runAllRunnableTasks(); + // ensure no freq=0 entries + cacheService.get(cacheKey2, size(250), 1); + cacheService.get(cacheKey3, size(250), 1); + assertEquals(2, cacheService.getFreq(region1)); + assertEquals(2, cacheService.getFreq(region2)); + + triggerDecay.run(); assertEquals(1, cacheService.getFreq(region0)); - assertEquals(0, cacheService.getFreq(region1)); + assertEquals(1, cacheService.getFreq(region1)); + assertEquals(1, cacheService.getFreq(region2)); - // advance another tick - taskQueue.advanceTime(); - taskQueue.runAllRunnableTasks(); + triggerDecay.run(); assertEquals(0, cacheService.getFreq(region0)); assertEquals(0, cacheService.getFreq(region1)); + assertEquals(0, cacheService.getFreq(region2)); } } @@ -308,12 +360,12 @@ public void testDecay() throws IOException { */ public void testGetMultiThreaded() throws IOException { int threads = between(2, 10); + int regionCount = between(1, 20); + // if we have enough regions, a get should always have a result (except for explicit evict interference) + final boolean allowAlreadyClosed = regionCount < threads; Settings settings = Settings.builder() .put(NODE_NAME_SETTING.getKey(), "node") - .put( - SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), - ByteSizeValue.ofBytes(size(between(1, 20) * 100L)).getStringRep() - ) + .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(regionCount * 100L)).getStringRep()) .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(100)).getStringRep()) .put(SharedBlobCacheService.SHARED_CACHE_MIN_TIME_DELTA_SETTING.getKey(), randomFrom("0", "1ms", "10s")) .put("path.home", createTempDir()) @@ -343,11 +395,13 @@ public void testGetMultiThreaded() throws IOException { ready.await(); for (int i = 0; i < iterations; ++i) { try { - SharedBlobCacheService.CacheFileRegion cacheFileRegion = cacheService.get( - cacheKeys[i], - fileLength, - regions[i] - ); + SharedBlobCacheService.CacheFileRegion cacheFileRegion; + try { + cacheFileRegion = cacheService.get(cacheKeys[i], fileLength, regions[i]); + } catch (AlreadyClosedException e) { + assert allowAlreadyClosed || e.getMessage().equals("evicted during free region allocation") : e; + throw e; + } if (cacheFileRegion.tryIncRef()) { if (yield[i] == 0) { Thread.yield(); @@ -415,8 +469,7 @@ public void execute(Runnable command) { threadPool, ThreadPool.Names.GENERIC, "bulk", - BlobCacheMetrics.NOOP, - threadPool::relativeTimeInMillis + BlobCacheMetrics.NOOP ) ) { { @@ -477,8 +530,7 @@ public ExecutorService executor(String name) { threadPool, ThreadPool.Names.GENERIC, "bulk", - BlobCacheMetrics.NOOP, - threadPool::relativeTimeInMillis + BlobCacheMetrics.NOOP ) ) { @@ -708,8 +760,7 @@ public void testCacheSizeChanges() throws IOException { } public void testMaybeEvictLeastUsed() throws Exception { - final int numRegions = 3; - randomIntBetween(1, 500); + final int numRegions = 10; final long regionSize = size(1L); Settings settings = Settings.builder() .put(NODE_NAME_SETTING.getKey(), "node") @@ -728,11 +779,10 @@ public void testMaybeEvictLeastUsed() throws Exception { taskQueue.getThreadPool(), ThreadPool.Names.GENERIC, "bulk", - BlobCacheMetrics.NOOP, - relativeTimeInMillis::get + BlobCacheMetrics.NOOP ) ) { - final Set cacheKeys = new HashSet<>(); + final Map.CacheFileRegion> cacheEntries = new HashMap<>(); assertThat("All regions are free", cacheService.freeRegionCount(), equalTo(numRegions)); assertThat("Cache has no entries", cacheService.maybeEvictLeastUsed(), is(false)); @@ -748,8 +798,7 @@ public void testMaybeEvictLeastUsed() throws Exception { ActionListener.noop() ); assertThat(cacheService.getFreq(entry), equalTo(1)); - relativeTimeInMillis.incrementAndGet(); - cacheKeys.add(cacheKey); + cacheEntries.put(cacheKey, entry); } assertThat("All regions are used", cacheService.freeRegionCount(), equalTo(0)); @@ -760,33 +809,41 @@ public void testMaybeEvictLeastUsed() throws Exception { assertThat("All regions are used", cacheService.freeRegionCount(), equalTo(0)); assertThat("Cache entries are not old enough to be evicted", cacheService.maybeEvictLeastUsed(), is(false)); - // simulate elapsed time - var minInternalMillis = SharedBlobCacheService.SHARED_CACHE_MIN_TIME_DELTA_SETTING.getDefault(Settings.EMPTY).millis(); - relativeTimeInMillis.addAndGet(minInternalMillis); + cacheService.maybeScheduleDecayAndNewEpoch(); + taskQueue.runAllRunnableTasks(); + + cacheEntries.keySet().forEach(key -> cacheService.get(key, regionSize, 0)); + cacheService.maybeScheduleDecayAndNewEpoch(); + taskQueue.runAllRunnableTasks(); // touch some random cache entries - var unusedCacheKeys = Set.copyOf(randomSubsetOf(cacheKeys)); - cacheKeys.forEach(key -> { - if (unusedCacheKeys.contains(key) == false) { - var entry = cacheService.get(key, regionSize, 0); - assertThat(cacheService.getFreq(entry), equalTo(2)); - } - }); + var useedCacheKeys = Set.copyOf(randomSubsetOf(cacheEntries.keySet())); + useedCacheKeys.forEach(key -> cacheService.get(key, regionSize, 0)); + + cacheEntries.forEach( + (key, entry) -> assertThat(cacheService.getFreq(entry), useedCacheKeys.contains(key) ? equalTo(3) : equalTo(1)) + ); assertThat("All regions are used", cacheService.freeRegionCount(), equalTo(0)); assertThat("Cache entries are not old enough to be evicted", cacheService.maybeEvictLeastUsed(), is(false)); - for (int i = 1; i <= unusedCacheKeys.size(); i++) { - // need to advance time and compute decay to decrease frequencies in cache and have an evictable entry - relativeTimeInMillis.addAndGet(minInternalMillis); - cacheService.computeDecay(); + cacheService.maybeScheduleDecayAndNewEpoch(); + taskQueue.runAllRunnableTasks(); - assertThat("Cache entry is old enough to be evicted", cacheService.maybeEvictLeastUsed(), is(true)); + assertThat("All regions are used", cacheService.freeRegionCount(), equalTo(0)); + cacheEntries.forEach( + (key, entry) -> assertThat(cacheService.getFreq(entry), useedCacheKeys.contains(key) ? equalTo(2) : equalTo(0)) + ); + + var zeroFrequencyCacheEntries = cacheEntries.size() - useedCacheKeys.size(); + for (int i = 0; i < zeroFrequencyCacheEntries; i++) { assertThat(cacheService.freeRegionCount(), equalTo(i)); + assertThat("Cache entry is old enough to be evicted", cacheService.maybeEvictLeastUsed(), is(true)); + assertThat(cacheService.freeRegionCount(), equalTo(i + 1)); } assertThat("No more cache entries old enough to be evicted", cacheService.maybeEvictLeastUsed(), is(false)); - assertThat(cacheService.freeRegionCount(), equalTo(unusedCacheKeys.size())); + assertThat(cacheService.freeRegionCount(), equalTo(zeroFrequencyCacheEntries)); } } @@ -826,8 +883,7 @@ public void execute(Runnable command) { threadPool, ThreadPool.Names.GENERIC, "bulk", - BlobCacheMetrics.NOOP, - relativeTimeInMillis::get + BlobCacheMetrics.NOOP ) ) { { @@ -942,8 +998,7 @@ public void testPopulate() throws Exception { taskQueue.getThreadPool(), ThreadPool.Names.GENERIC, ThreadPool.Names.GENERIC, - BlobCacheMetrics.NOOP, - relativeTimeInMillis::get + BlobCacheMetrics.NOOP ) ) { final var cacheKey = generateCacheKey();