Skip to content

Commit

Permalink
Fix live version map unsafe flag
Browse files Browse the repository at this point in the history
Live version map can unexpectedly go unsafe with single
threaded indexing going on. This can cause translog replay
to need to transition from unsafe to safe version map.
  • Loading branch information
henningandersen committed Oct 31, 2024
1 parent a48925e commit fc87b56
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.concurrent.atomic.AtomicLong;

/** Maps _uid value to its version information. */
public final class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
public class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {

private final KeyedLock<BytesRef> keyedLock = new KeyedLock<>();

Expand Down Expand Up @@ -202,7 +202,12 @@ Maps invalidateOldMapForAssert() {
*/
Maps invalidateOldMap(LiveVersionMapArchive archive) {
archive.afterRefresh(old);
return new Maps(current, VersionLookup.EMPTY, previousMapsNeededSafeAccess);
Maps result = new Maps(current, VersionLookup.EMPTY, previousMapsNeededSafeAccess);
// not JMM compatible, similar to beforeRefresh
if (needsSafeAccess) {
result.needsSafeAccess = true;
}
return result;
}

void put(BytesRef uid, VersionValue version) {
Expand Down Expand Up @@ -282,7 +287,15 @@ public void beforeRefresh() throws IOException {
// map. While reopen is running, any lookup will first
// try this new map, then fallback to old, then to the
// current searcher:
maps = maps.buildTransitionMap();
Maps original = maps;
Maps transitionMap = original.buildTransitionMap();
maps = transitionMap;
// this is still not JMM safe, but makes the test pass. There are a few options:
// 1. Do read then modify instead of writing to it in enforceSafeAccess. The read can be non-volatile, the write volatile.
// 2. Make the field volatile (but the comment on it seems to indicate that it would be bad for perf).
if (original.needsSafeAccess) {
transitionMap.needsSafeAccess = true;
}
assert (unsafeKeysMap = unsafeKeysMap.buildTransitionMap()) != null;
// This is not 100% correct, since concurrent indexing ops can change these counters in between our execution of the previous
// line and this one, but that should be minor, and the error won't accumulate over time:
Expand Down Expand Up @@ -345,7 +358,14 @@ boolean isUnsafe() {
}

void enforceSafeAccess() {
maps.needsSafeAccess = true;
Maps copy = maps;
copy.needsSafeAccess = true;
Maps nextCopy;
// loop until we have the same maps after the assignment
while ((nextCopy = maps) != copy) {
nextCopy.needsSafeAccess = true;
copy = nextCopy;
}
}

boolean isSafeAccessRequired() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.ESTestCase;

Expand Down Expand Up @@ -510,4 +511,42 @@ public void testVersionMapReclaimableRamBytes() throws IOException {
assertEquals(map.reclaimableRefreshRamBytes(), 0L);
assertEquals(map.ramBytesUsedForRefresh(), 0L);
}

/**
* When we only do operations that enforce safe access, we expect to stay as a safe map.
*/
public void testNotUnsafeConcurrently() throws InterruptedException {
LiveVersionMap map = new LiveVersionMap();
AtomicBoolean running = new AtomicBoolean(true);
Thread refresher = new Thread(() -> {
while (running.get()) {
try {
map.beforeRefresh();
map.afterRefresh(true);
} catch (IOException e) {
fail(e);
throw new RuntimeException(e);
}
}
});

refresher.start();
try {
// 1000 is enough to provoke original version
for (int i = 0; i < 100000; ++i) {
BytesRef uid = Uid.encodeId(randomIdentifier());
try (Releasable releasable = map.acquireLock(uid)) {
map.enforceSafeAccess();
map.maybePutIndexUnderLock(uid, new IndexVersionValue(null, 0, 0, 0));
assertFalse(map.isUnsafe());
}
}

} finally {
running.set(false);
refresher.join(10000);

}
assertFalse(refresher.isAlive());
}
}

0 comments on commit fc87b56

Please sign in to comment.