diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java index dc1a863389daf..412bfa65a4488 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java @@ -810,7 +810,7 @@ private static VarHandle findIOVarHandle() { // a `CacheFileRegion` without checking the value is non-null (with a volatile read, ensuring the value is visible in that thread). // we assume any IndexInput passing among threads (slicing etc) is done with proper happens-before semantics (otherwise they'd // themselves break). - SharedBytes.IO io = null; + private SharedBytes.IO io = null; CacheFileRegion(SharedBlobCacheService blobCacheService, RegionKey regionKey, int regionSize) { this.blobCacheService = blobCacheService; @@ -820,8 +820,9 @@ private static VarHandle findIOVarHandle() { tracker = new SparseFileTracker("file", regionSize); } - public long physicalStartOffset() { - var ioRef = io; + // only used for logging + private long physicalStartOffset() { + var ioRef = nonVolatileIO(); return ioRef == null ? -1L : (long) regionKey.region * blobCacheService.regionSize; } @@ -885,6 +886,7 @@ public boolean forceEvict() { protected void closeInternal() { // now actually free the region associated with this chunk // we held the "this" lock when this was evicted, hence if io is not filled in, chunk will never be registered. + SharedBytes.IO io = volatileIO(); if (io != null) { assert blobCacheService.regionOwners.remove(io) == this; blobCacheService.freeRegions.add(io); @@ -904,12 +906,13 @@ private void volatileIO(SharedBytes.IO io) { VH_IO.setVolatile(this, io); } + private SharedBytes.IO nonVolatileIO() { return io; } /** * Optimistically try to read from the region * @return true if successful, i.e., not evicted and data available, false if evicted */ boolean tryRead(ByteBuffer buf, long offset) throws IOException { - SharedBytes.IO ioRef = this.io; + SharedBytes.IO ioRef = nonVolatileIO(); if (ioRef != null) { int readBytes = ioRef.read(buf, blobCacheService.getRegionRelativePosition(offset)); if (isEvicted()) { @@ -947,7 +950,7 @@ void populate( rangeToWrite, rangeToWrite, Assertions.ENABLED ? ActionListener.releaseAfter(ActionListener.running(() -> { - assert blobCacheService.regionOwners.get(io) == this; + assert blobCacheService.regionOwners.get(nonVolatileIO()) == this; }), refs.acquire()) : refs.acquireListener() ); if (gaps.isEmpty()) { @@ -983,7 +986,7 @@ void populateAndRead( rangeToWrite, rangeToRead, ActionListener.releaseAfter(listener, refs.acquire()).delegateFailureAndWrap((l, success) -> { - var ioRef = io; + var ioRef = nonVolatileIO(); assert blobCacheService.regionOwners.get(ioRef) == this; final int start = Math.toIntExact(rangeToRead.start()); final int read = reader.onRangeAvailable(ioRef, start, start, Math.toIntExact(rangeToRead.length())); @@ -1041,7 +1044,7 @@ private Runnable fillGapRunnable( ActionListener listener ) { return () -> ActionListener.run(listener, l -> { - var ioRef = io; + var ioRef = nonVolatileIO(); assert blobCacheService.regionOwners.get(ioRef) == CacheFileRegion.this; assert CacheFileRegion.this.hasReferences() : CacheFileRegion.this; int start = Math.toIntExact(gap.start()); @@ -1288,8 +1291,8 @@ public void fillCacheRange( len, progressUpdater, Assertions.ENABLED ? ActionListener.runBefore(completionListener, () -> { - assert regionOwners.get(fileRegion.io) == fileRegion - : "File chunk [" + fileRegion.regionKey + "] no longer owns IO [" + fileRegion.io + "]"; + assert regionOwners.get(fileRegion.nonVolatileIO()) == fileRegion + : "File chunk [" + fileRegion.regionKey + "] no longer owns IO [" + fileRegion.nonVolatileIO() + "]"; }) : completionListener ); } @@ -1310,8 +1313,8 @@ private RangeAvailableHandler readerWithOffset(RangeAvailableHandler reader, Cac return (channel, channelPos, relativePos, len) -> { assert assertValidRegionAndLength(fileRegion, channelPos, len); final int bytesRead = adjustedReader.onRangeAvailable(channel, channelPos, relativePos, len); - assert regionOwners.get(fileRegion.io) == fileRegion - : "File chunk [" + fileRegion.regionKey + "] no longer owns IO [" + fileRegion.io + "]"; + assert regionOwners.get(fileRegion.nonVolatileIO()) == fileRegion + : "File chunk [" + fileRegion.regionKey + "] no longer owns IO [" + fileRegion.nonVolatileIO() + "]"; return bytesRead; }; } @@ -1319,9 +1322,9 @@ private RangeAvailableHandler readerWithOffset(RangeAvailableHandler reader, Cac } private boolean assertValidRegionAndLength(CacheFileRegion fileRegion, int channelPos, int len) { - assert fileRegion.io != null; + assert fileRegion.nonVolatileIO() != null; assert fileRegion.hasReferences(); - assert regionOwners.get(fileRegion.io) == fileRegion; + assert regionOwners.get(fileRegion.nonVolatileIO()) == fileRegion; assert channelPos >= 0 && channelPos + len <= regionSize; return true; } @@ -1669,7 +1672,7 @@ private boolean assertChunkActiveOrEvicted(LFUCacheEntry entry) { assert entry.prev != null || entry.chunk.isEvicted(); } - SharedBytes.IO io = entry.chunk.io; + SharedBytes.IO io = entry.chunk.nonVolatileIO(); assert io != null || entry.chunk.isEvicted(); assert io == null || regionOwners.get(io) == entry.chunk || entry.chunk.isEvicted(); return true; @@ -1837,7 +1840,7 @@ public boolean maybeEvictLeastUsed() { synchronized (SharedBlobCacheService.this) { for (LFUCacheEntry entry = freqs[0]; entry != null; entry = entry.next) { boolean evicted = entry.chunk.tryEvict(); - if (evicted && entry.chunk.io != null) { + if (evicted && entry.chunk.volatileIO() != null) { unlink(entry); keyMapping.remove(entry.chunk.regionKey, entry); return true;