Skip to content

Commit

Permalink
explict non-volatile
Browse files Browse the repository at this point in the history
  • Loading branch information
henningandersen committed Aug 24, 2024
1 parent 26772d5 commit b54fc15
Showing 1 changed file with 18 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,7 @@ private static VarHandle findIOVarHandle() {
// a `CacheFileRegion` without checking the value is non-null (with a volatile read, ensuring the value is visible in that thread).
// we assume any IndexInput passing among threads (slicing etc) is done with proper happens-before semantics (otherwise they'd
// themselves break).
SharedBytes.IO io = null;
private SharedBytes.IO io = null;

CacheFileRegion(SharedBlobCacheService<KeyType> blobCacheService, RegionKey<KeyType> regionKey, int regionSize) {
this.blobCacheService = blobCacheService;
Expand All @@ -820,8 +820,9 @@ private static VarHandle findIOVarHandle() {
tracker = new SparseFileTracker("file", regionSize);
}

public long physicalStartOffset() {
var ioRef = io;
// only used for logging
private long physicalStartOffset() {
var ioRef = nonVolatileIO();
return ioRef == null ? -1L : (long) regionKey.region * blobCacheService.regionSize;
}

Expand Down Expand Up @@ -885,6 +886,7 @@ public boolean forceEvict() {
protected void closeInternal() {
// now actually free the region associated with this chunk
// we held the "this" lock when this was evicted, hence if io is not filled in, chunk will never be registered.
SharedBytes.IO io = volatileIO();
if (io != null) {
assert blobCacheService.regionOwners.remove(io) == this;
blobCacheService.freeRegions.add(io);
Expand All @@ -904,12 +906,13 @@ private void volatileIO(SharedBytes.IO io) {
VH_IO.setVolatile(this, io);
}

private SharedBytes.IO nonVolatileIO() { return io; }
/**
* Optimistically try to read from the region
* @return true if successful, i.e., not evicted and data available, false if evicted
*/
boolean tryRead(ByteBuffer buf, long offset) throws IOException {
SharedBytes.IO ioRef = this.io;
SharedBytes.IO ioRef = nonVolatileIO();
if (ioRef != null) {
int readBytes = ioRef.read(buf, blobCacheService.getRegionRelativePosition(offset));
if (isEvicted()) {
Expand Down Expand Up @@ -947,7 +950,7 @@ void populate(
rangeToWrite,
rangeToWrite,
Assertions.ENABLED ? ActionListener.releaseAfter(ActionListener.running(() -> {
assert blobCacheService.regionOwners.get(io) == this;
assert blobCacheService.regionOwners.get(nonVolatileIO()) == this;
}), refs.acquire()) : refs.acquireListener()
);
if (gaps.isEmpty()) {
Expand Down Expand Up @@ -983,7 +986,7 @@ void populateAndRead(
rangeToWrite,
rangeToRead,
ActionListener.releaseAfter(listener, refs.acquire()).delegateFailureAndWrap((l, success) -> {
var ioRef = io;
var ioRef = nonVolatileIO();
assert blobCacheService.regionOwners.get(ioRef) == this;
final int start = Math.toIntExact(rangeToRead.start());
final int read = reader.onRangeAvailable(ioRef, start, start, Math.toIntExact(rangeToRead.length()));
Expand Down Expand Up @@ -1041,7 +1044,7 @@ private Runnable fillGapRunnable(
ActionListener<Void> listener
) {
return () -> ActionListener.run(listener, l -> {
var ioRef = io;
var ioRef = nonVolatileIO();
assert blobCacheService.regionOwners.get(ioRef) == CacheFileRegion.this;
assert CacheFileRegion.this.hasReferences() : CacheFileRegion.this;
int start = Math.toIntExact(gap.start());
Expand Down Expand Up @@ -1288,8 +1291,8 @@ public void fillCacheRange(
len,
progressUpdater,
Assertions.ENABLED ? ActionListener.runBefore(completionListener, () -> {
assert regionOwners.get(fileRegion.io) == fileRegion
: "File chunk [" + fileRegion.regionKey + "] no longer owns IO [" + fileRegion.io + "]";
assert regionOwners.get(fileRegion.nonVolatileIO()) == fileRegion
: "File chunk [" + fileRegion.regionKey + "] no longer owns IO [" + fileRegion.nonVolatileIO() + "]";
}) : completionListener
);
}
Expand All @@ -1310,18 +1313,18 @@ private RangeAvailableHandler readerWithOffset(RangeAvailableHandler reader, Cac
return (channel, channelPos, relativePos, len) -> {
assert assertValidRegionAndLength(fileRegion, channelPos, len);
final int bytesRead = adjustedReader.onRangeAvailable(channel, channelPos, relativePos, len);
assert regionOwners.get(fileRegion.io) == fileRegion
: "File chunk [" + fileRegion.regionKey + "] no longer owns IO [" + fileRegion.io + "]";
assert regionOwners.get(fileRegion.nonVolatileIO()) == fileRegion
: "File chunk [" + fileRegion.regionKey + "] no longer owns IO [" + fileRegion.nonVolatileIO() + "]";
return bytesRead;
};
}
return adjustedReader;
}

private boolean assertValidRegionAndLength(CacheFileRegion<KeyType> fileRegion, int channelPos, int len) {
assert fileRegion.io != null;
assert fileRegion.nonVolatileIO() != null;
assert fileRegion.hasReferences();
assert regionOwners.get(fileRegion.io) == fileRegion;
assert regionOwners.get(fileRegion.nonVolatileIO()) == fileRegion;
assert channelPos >= 0 && channelPos + len <= regionSize;
return true;
}
Expand Down Expand Up @@ -1669,7 +1672,7 @@ private boolean assertChunkActiveOrEvicted(LFUCacheEntry entry) {
assert entry.prev != null || entry.chunk.isEvicted();

}
SharedBytes.IO io = entry.chunk.io;
SharedBytes.IO io = entry.chunk.nonVolatileIO();
assert io != null || entry.chunk.isEvicted();
assert io == null || regionOwners.get(io) == entry.chunk || entry.chunk.isEvicted();
return true;
Expand Down Expand Up @@ -1837,7 +1840,7 @@ public boolean maybeEvictLeastUsed() {
synchronized (SharedBlobCacheService.this) {
for (LFUCacheEntry entry = freqs[0]; entry != null; entry = entry.next) {
boolean evicted = entry.chunk.tryEvict();
if (evicted && entry.chunk.io != null) {
if (evicted && entry.chunk.volatileIO() != null) {
unlink(entry);
keyMapping.remove(entry.chunk.regionKey, entry);
return true;
Expand Down

0 comments on commit b54fc15

Please sign in to comment.