Skip to content

Commit

Permalink
Fix async expiration when completion races with a read
Browse files Browse the repository at this point in the history
  • Loading branch information
ben-manes committed Feb 23, 2019
1 parent 45feb12 commit d43764b
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,28 @@ private void addLink(String method, String varName) {
private void addVariableTime(String varName) {
MethodSpec getter = MethodSpec.methodBuilder("getVariableTime")
.addModifiers(Modifier.PUBLIC)
.addStatement("return $N", varName)
.addStatement("return $T.UNSAFE.getLong(this, $N)",
UNSAFE_ACCESS, offsetName(varName))
.returns(long.class)
.build();
MethodSpec setter = MethodSpec.methodBuilder("setVariableTime")
.addModifiers(Modifier.PUBLIC)
.addParameter(long.class, varName)
.addStatement("this.$N = $N", varName, varName)
.addStatement("$T.UNSAFE.putLong(this, $N, $N)",
UNSAFE_ACCESS, offsetName(varName), varName)
.build();
MethodSpec cas = MethodSpec.methodBuilder("casVariableTime")
.addModifiers(Modifier.PUBLIC)
.addParameter(long.class, "expect")
.addParameter(long.class, "update")
.returns(boolean.class)
.addStatement("return ($N == $N)\n&& $T.UNSAFE.compareAndSwapLong(this, $N, $N, $N)",
varName, "expect", UNSAFE_ACCESS, offsetName(varName), "expect", "update")
.build();
context.nodeSubtype
.addMethod(getter)
.addMethod(setter);
.addMethod(setter)
.addMethod(cas);
}

private void addAccessExpiration() {
Expand Down Expand Up @@ -132,8 +143,8 @@ private void addRefreshExpiration() {
.addParameter(long.class, "expect")
.addParameter(long.class, "update")
.returns(boolean.class)
.addStatement("return $T.UNSAFE.compareAndSwapLong(this, $N, $N, $N)",
UNSAFE_ACCESS, offsetName("writeTime"), "expect", "update")
.addStatement("return ($N == $N)\n&& $T.UNSAFE.compareAndSwapLong(this, $N, $N, $N)",
"writeTime", "expect", UNSAFE_ACCESS, offsetName("writeTime"), "expect", "update")
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,35 @@ long expireAfterRead(Node<K, V> node, @Nullable K key,
return 0L;
}

/**
* Attempts to update the access time for the entry after a read.
*
* @param node the entry in the page replacement policy
* @param key the key of the entry that was read
* @param value the value of the entry that was read
* @param expiry the calculator for the expiration time
* @param now the current time, in nanoseconds
*/
void tryExpireAfterRead(Node<K, V> node, @Nullable K key,
@Nullable V value, Expiry<K, V> expiry, long now) {
if (!expiresVariable() || (key == null) || (value == null)) {
return;
}

long variableTime = node.getVariableTime();
long currentDuration = Math.max(1, variableTime - now);
if (isAsync && (currentDuration > MAXIMUM_EXPIRY)) {
// FIXME: break test case
//return;
}

long duration = expiry.expireAfterRead(key, value, now, currentDuration);
if (duration != currentDuration) {
long expirationTime = isAsync ? (now + duration) : (now + Math.min(duration, MAXIMUM_EXPIRY));
node.casVariableTime(variableTime, expirationTime);
}
}

void setVariableTime(Node<K, V> node, long expirationTime) {
if (expiresVariable()) {
node.setVariableTime(expirationTime);
Expand Down Expand Up @@ -1823,7 +1852,7 @@ public boolean containsValue(Object value) {
@SuppressWarnings("unchecked")
K castedKey = (K) key;
setAccessTime(node, now);
setVariableTime(node, expireAfterRead(node, castedKey, value, expiry(), now));
tryExpireAfterRead(node, castedKey, value, expiry(), now);
}
afterRead(node, now, recordStats);
return value;
Expand Down Expand Up @@ -1862,7 +1891,7 @@ public Map<K, V> getAllPresent(Iterable<?> keys) {
if (!isComputingAsync(node)) {
@SuppressWarnings("unchecked")
K castedKey = (K) key;
setVariableTime(node, expireAfterRead(node, castedKey, value, expiry(), now));
tryExpireAfterRead(node, castedKey, value, expiry(), now);
setAccessTime(node, now);
}
afterRead(node, now, /* recordHit */ false);
Expand Down Expand Up @@ -2274,7 +2303,7 @@ public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
V value = node.getValue();
if ((value != null) && !hasExpired(node, now)) {
if (!isComputingAsync(node)) {
setVariableTime(node, expireAfterRead(node, key, value, expiry(), now));
tryExpireAfterRead(node, key, value, expiry(), now);
setAccessTime(node, now);
}

Expand Down Expand Up @@ -2362,7 +2391,7 @@ public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
}
if (newValue[0] == null) {
if (!isComputingAsync(node)) {
setVariableTime(node, expireAfterRead(node, key, oldValue[0], expiry(), now[0]));
tryExpireAfterRead(node, key, oldValue[0], expiry(), now[0]);
setAccessTime(node, now[0]);
}

Expand Down Expand Up @@ -2547,7 +2576,7 @@ public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
} else {
if (cause[0] == null) {
if (!isComputingAsync(node)) {
setVariableTime(node, expireAfterRead(node, key, newValue[0], expiry(), now[0]));
tryExpireAfterRead(node, key, newValue[0], expiry(), now[0]);
setAccessTime(node, now[0]);
}
} else if (cause[0] == RemovalCause.COLLECTED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,14 @@ public long getVariableTime() {
*/
public void setVariableTime(long time) {}

/**
* Atomically sets the variable time to the given updated value if the current value equals the
* expected value and returns if the update was successful.
*/
public boolean casVariableTime(long expect, long update) {
throw new UnsupportedOperationException();
}

@GuardedBy("evictionLock")
public Node<K, V> getPreviousInVariableOrder() {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright 2019 Ben Manes. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.benmanes.caffeine.cache.issues;

import static com.github.benmanes.caffeine.testing.Awaits.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.annotation.Nonnull;

import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Expiry;
import com.github.benmanes.caffeine.cache.Policy.VarExpiration;

/**
* Issue #298: Stale data when using Expiry
* <p>
* When a future value in an AsyncCache is in-flight, the entry has an infinite expiration time to
* disable eviction. When it completes, a callback performs a no-op write into the cache to
* update its metadata (expiration, weight, etc). This may race with a reader who obtains a
* completed future, reads the current duration as infinite, and tries to set the expiration time
* accordingly (to indicate no change). If the writer completes before the reader updates, then we
* encounter an ABA problem where the entry is set to never expire.
*
* @author ben.manes@gmail.com (Ben Manes)
*/
@Test(groups = "isolated")
public final class Issue298Test {
AtomicBoolean startedLoad;
AtomicBoolean doLoad;

AtomicBoolean startedCreate;
AtomicBoolean doCreate;

AtomicBoolean startedRead;
AtomicBoolean doRead;

AsyncLoadingCache<String, String> cache;
String key;

@BeforeMethod
public void before() {
startedLoad = new AtomicBoolean();
startedCreate = new AtomicBoolean();
startedRead = new AtomicBoolean();
doCreate = new AtomicBoolean();
doLoad = new AtomicBoolean();
doRead = new AtomicBoolean();
cache = makeAsyncCache();
key = "key";
}

@Test
public void readDuringCreate() {
// Loaded value and waiting at expireAfterCreate (expire: infinite)
cache.get(key);
await().untilTrue(startedLoad);
doLoad.set(true);
await().untilTrue(startedCreate);

// Async read waiting at expireAfterRead
CompletableFuture<Void> reader = CompletableFuture.runAsync(() -> cache.get(key));
await().untilTrue(startedRead);

// Ran expireAfterCreate (expire: infinite -> create)
doCreate.set(true);
VarExpiration<String, String> policy = cache.synchronous().policy().expireVariably().get();
await().until(() -> {
return (policy.getExpiresAfter(key).get().toDays() == 0L);
});

// Ran reader (expire: create -> ?)
doRead.set(true);
reader.join();

// Ensure expire is [expireAfterCreate], not [infinite]
assertThat(policy.getExpiresAfter(key).get().toDays(), is(0L));
}

private AsyncLoadingCache<String, String> makeAsyncCache() {
return Caffeine.newBuilder()
.expireAfter(new Expiry<String, String>() {
@Override public long expireAfterCreate(@Nonnull String key,
@Nonnull String value, long currentTime) {
startedCreate.set(true);
await().untilTrue(doCreate);
return Duration.ofDays(1).toNanos();
}
@Override public long expireAfterUpdate(@Nonnull String key,
@Nonnull String value, long currentTime, long currentDuration) {
return currentDuration;
}
@Override public long expireAfterRead(@Nonnull String key,
@Nonnull String value, long currentTime, long currentDuration) {
startedRead.set(true);
await().untilTrue(doRead);
return currentDuration;
}
})
.buildAsync(key -> {
startedLoad.set(true);
await().untilTrue(doLoad);
return key + "'s value";
});
}
}

0 comments on commit d43764b

Please sign in to comment.