diff --git a/lib/trino-collect/pom.xml b/lib/trino-collect/pom.xml
index 21ce82fc9a34..eaae0419457b 100644
--- a/lib/trino-collect/pom.xml
+++ b/lib/trino-collect/pom.xml
@@ -23,6 +23,11 @@
jsr305
+
+ com.google.errorprone
+ error_prone_annotations
+
+
com.google.guavaguava
diff --git a/lib/trino-collect/src/main/java/io/trino/collect/cache/EvictableCache.java b/lib/trino-collect/src/main/java/io/trino/collect/cache/EvictableCache.java
index f551998a41ae..ac1d55b96658 100644
--- a/lib/trino-collect/src/main/java/io/trino/collect/cache/EvictableCache.java
+++ b/lib/trino-collect/src/main/java/io/trino/collect/cache/EvictableCache.java
@@ -13,184 +13,229 @@
*/
package io.trino.collect.cache;
-import com.google.common.cache.AbstractCache;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.AbstractLoadingCache;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
import com.google.common.cache.CacheStats;
-import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListenableFuture;
import org.gaul.modernizer_maven_annotations.SuppressModernizer;
import javax.annotation.CheckForNull;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import static io.trino.collect.cache.MoreFutures.getDone;
-import static java.lang.System.nanoTime;
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Verify.verify;
+import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
/**
- * A {@link Cache} implementation similar to ones produced by {@link CacheBuilder#build()}, but one that does not exhibit
- * Guava issue #1881: a cache inspection with
- * {@link #getIfPresent(Object)} or {@link #get(Object, Callable)} is guaranteed to return fresh state after
- * {@link #invalidate(Object)}, {@link #invalidateAll(Iterable)} or {@link #invalidateAll()} were called.
+ * A {@link Cache} and {@link LoadingCache} implementation similar to ones produced by {@link CacheBuilder#build()},
+ * but one that does not exhibit Guava issue #1881:
+ * a cache inspection with {@link #getIfPresent(Object)} or {@link #get(Object, Callable)} is guaranteed to return
+ * fresh state after {@link #invalidate(Object)}, {@link #invalidateAll(Iterable)} or {@link #invalidateAll()} were called.
+ *
+ * @see EvictableCacheBuilder
*/
-public class EvictableCache
- extends AbstractCache
- implements Cache
+class EvictableCache
+ extends AbstractLoadingCache
+ implements LoadingCache
{
- /**
- * @apiNote Piggy-back on {@link CacheBuilder} for cache TTL.
- */
- public static EvictableCache buildWith(CacheBuilder super K, Object> cacheBuilder)
- {
- return new EvictableCache<>(cacheBuilder);
- }
-
- // private final Map> map = new ConcurrentHashMap<>();
- private final Cache> delegate;
-
- private final StatsCounter statsCounter = new SimpleStatsCounter();
-
- private EvictableCache(CacheBuilder super K, Object> cacheBuilder)
+ // Invariant: for every (K, token) entry in the tokens map, there is a live
+ // cache entry (token, ?) in dataCache, that, upon eviction, will cause the tokens'
+ // entry to be removed.
+ private final ConcurrentHashMap> tokens = new ConcurrentHashMap<>();
+ // The dataCache can have entries with no corresponding tokens in the tokens map.
+ // For example, this can happen when invalidation concurs with load.
+ // The dataCache must be bounded.
+ private final LoadingCache, V> dataCache;
+
+ EvictableCache(CacheBuilder super Token, ? super V> cacheBuilder, CacheLoader super K, V> cacheLoader)
{
- requireNonNull(cacheBuilder, "cacheBuilder is null");
- this.delegate = buildUnsafeCache(cacheBuilder);
+ dataCache = buildUnsafeCache(
+ cacheBuilder
+ ., V>removalListener(removal -> {
+ Token token = removal.getKey();
+ verify(token != null, "token is null");
+ tokens.remove(token.getKey(), token);
+ }),
+ new TokenCacheLoader<>(cacheLoader));
}
- @SuppressModernizer // CacheBuilder.build() is forbidden, advising to use this class as a safety-adding wrapper.
- private static Cache buildUnsafeCache(CacheBuilder super K, ? super V> cacheBuilder)
+ @SuppressModernizer // CacheBuilder.build(CacheLoader) is forbidden, advising to use this class as a safety-adding wrapper.
+ private static LoadingCache buildUnsafeCache(CacheBuilder super K, ? super V> cacheBuilder, CacheLoader super K, V> cacheLoader)
{
- return cacheBuilder.build();
+ return cacheBuilder.build(cacheLoader);
}
@CheckForNull
@Override
public V getIfPresent(Object key)
{
- Future future = delegate.getIfPresent(key);
- if (future != null && future.isDone()) {
- statsCounter.recordHits(1);
- return getDone(future);
+ @SuppressWarnings("SuspiciousMethodCalls") // Object passed to map as key K
+ Token token = tokens.get(key);
+ if (token == null) {
+ return null;
}
- statsCounter.recordMisses(1);
- return null;
+ return dataCache.getIfPresent(token);
}
@Override
- public V get(K key, Callable extends V> loader)
+ public V get(K key, Callable extends V> valueLoader)
throws ExecutionException
{
- requireNonNull(key, "key is null");
- requireNonNull(loader, "loader is null");
-
- while (true) {
- SettableFuture newFuture = SettableFuture.create();
- Future future = delegate.asMap().computeIfAbsent(key, ignored -> newFuture);
- if (future.isDone() && !future.isCancelled()) {
- statsCounter.recordHits(1);
- return getDone(future);
+ Token newToken = new Token<>(key);
+ Token token = tokens.computeIfAbsent(key, ignored -> newToken);
+ try {
+ return dataCache.get(token, valueLoader);
+ }
+ catch (Throwable e) {
+ if (newToken == token) {
+ // Failed to load and it was our new token persisted in tokens map.
+ // No cache entry exists for the token (unless concurrent load happened),
+ // so we need to remove it.
+ tokens.remove(key, newToken);
}
+ throw e;
+ }
+ }
- statsCounter.recordMisses(1);
- if (future == newFuture) {
- // We put the future in.
+ @Override
+ public V get(K key)
+ throws ExecutionException
+ {
+ Token newToken = new Token<>(key);
+ Token token = tokens.computeIfAbsent(key, ignored -> newToken);
+ try {
+ return dataCache.get(token);
+ }
+ catch (Throwable e) {
+ if (newToken == token) {
+ // Failed to load and it was our new token persisted in tokens map.
+ // No cache entry exists for the token (unless concurrent load happened),
+ // so we need to remove it.
+ tokens.remove(key, newToken);
+ }
+ throw e;
+ }
+ }
- V computed;
- long loadStartNanos = nanoTime();
- try {
- computed = loader.call();
- requireNonNull(computed, "computed is null");
- }
- catch (Exception e) {
- statsCounter.recordLoadException(nanoTime() - loadStartNanos);
- delegate.asMap().remove(key, newFuture);
- // wake up waiters, let them retry
- newFuture.cancel(false);
- throw new ExecutionException(e);
+ @Override
+ public ImmutableMap getAll(Iterable extends K> keys)
+ throws ExecutionException
+ {
+ List> newTokens = new ArrayList<>();
+ try {
+ BiMap> keyToToken = HashBiMap.create();
+ for (K key : keys) {
+ // This is not bulk, but is fast local operation
+ Token newToken = new Token<>(key);
+ Token token = tokens.computeIfAbsent(key, ignored -> newToken);
+ keyToToken.put(key, token);
+ if (token == newToken) {
+ newTokens.add(newToken);
}
- statsCounter.recordLoadSuccess(nanoTime() - loadStartNanos);
- newFuture.set(computed);
- return computed;
}
- // Someone else is loading the key, let's wait.
- try {
- return future.get();
- }
- catch (CancellationException e) {
- // Invalidated, or load failed
- }
- catch (ExecutionException e) {
- // Should never happen
- throw new IllegalStateException("Future unexpectedly completed with exception", e);
+ Map, V> values = dataCache.getAll(keyToToken.values());
+
+ BiMap, K> tokenToKey = keyToToken.inverse();
+ ImmutableMap.Builder result = ImmutableMap.builder();
+ for (Map.Entry, V> entry : values.entrySet()) {
+ Token token = entry.getKey();
+
+ // While token.getKey() returns equal key, a caller may expect us to maintain key identity, in case equal keys are still distinguishable.
+ K key = tokenToKey.get(token);
+ checkState(key != null, "No key found for %s in %s when loading %s", token, tokenToKey, keys);
+
+ result.put(key, entry.getValue());
}
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException("Interrupted", e);
+ return result.buildOrThrow();
+ }
+ catch (Throwable e) {
+ for (Token token : newTokens) {
+ // Failed to load and it was our new token persisted in tokens map.
+ // No cache entry exists for the token (unless concurrent load happened),
+ // so we need to remove it.
+ tokens.remove(token.getKey(), token);
}
-
- // Someone else was loading the key, but the load was invalidated.
+ throw e;
}
}
@Override
- public void put(K key, V value)
+ public void refresh(K key)
{
- throw new UnsupportedOperationException("The operation is not supported, as in inherently races with cache invalidation. Use get(key, callable) instead.");
+ // The refresh loads a new entry, if it wasn't in the cache yet. Thus, we would create a new Token.
+ // However, dataCache.refresh is asynchronous and may fail, so no cache entry may be created.
+ // In such case we would leak the newly created token.
+ throw new UnsupportedOperationException();
}
@Override
- public void invalidate(Object key)
+ public long size()
{
- delegate.invalidate(key);
+ return dataCache.size();
}
@Override
- public void invalidateAll(Iterable> keys)
+ public void cleanUp()
{
- delegate.invalidateAll(keys);
+ dataCache.cleanUp();
+ }
+
+ @VisibleForTesting
+ int tokensCount()
+ {
+ return tokens.size();
}
@Override
- public void invalidateAll()
+ public void invalidate(Object key)
{
- delegate.invalidateAll();
+ @SuppressWarnings("SuspiciousMethodCalls") // Object passed to map as key K
+ Token token = tokens.remove(key);
+ if (token != null) {
+ dataCache.invalidate(token);
+ }
}
@Override
- public long size()
+ public void invalidateAll()
{
- // Includes entries being computed. Approximate, as allowed per method contract.
- return delegate.size();
+ dataCache.invalidateAll();
+ tokens.clear();
}
@Override
public CacheStats stats()
{
- return statsCounter.snapshot().plus(
- new CacheStats(
- 0,
- 0,
- 0,
- 0,
- 0,
- delegate.stats().evictionCount()));
+ return dataCache.stats();
}
@Override
public ConcurrentMap asMap()
{
- ConcurrentMap> delegate = this.delegate.asMap();
return new ConcurrentMap()
{
+ private final ConcurrentMap, V> dataCacheMap = dataCache.asMap();
+
@Override
public V putIfAbsent(K key, V value)
{
@@ -200,14 +245,22 @@ public V putIfAbsent(K key, V value)
@Override
public boolean remove(Object key, Object value)
{
- // We could use delegate.compute(key, ..) to check existence and remove, but compute takes `K key` and we have `Object`
- throw new UnsupportedOperationException();
+ @SuppressWarnings("SuspiciousMethodCalls") // Object passed to map as key K
+ Token token = tokens.get(key);
+ if (token != null) {
+ return dataCacheMap.remove(token, value);
+ }
+ return false;
}
@Override
public boolean replace(K key, V oldValue, V newValue)
{
- throw new UnsupportedOperationException("The operation is not supported, as in inherently races with cache invalidation");
+ Token token = tokens.get(key);
+ if (token != null) {
+ return dataCacheMap.replace(token, oldValue, newValue);
+ }
+ return false;
}
@Override
@@ -219,30 +272,25 @@ public V replace(K key, V value)
@Override
public int size()
{
- return delegate.size();
+ return dataCache.asMap().size();
}
@Override
public boolean isEmpty()
{
- return delegate.isEmpty();
+ return dataCache.asMap().isEmpty();
}
@Override
public boolean containsKey(Object key)
{
- return delegate.containsKey(key);
+ return tokens.containsKey(key);
}
@Override
public boolean containsValue(Object value)
{
- for (Future future : delegate.values()) {
- if (future.isDone() && !future.isCancelled() && Objects.equals(getDone(future), value)) {
- return true;
- }
- }
- return false;
+ return values().contains(value);
}
@Override
@@ -260,9 +308,9 @@ public V put(K key, V value)
@Override
public V remove(Object key)
{
- Future future = delegate.remove(key);
- if (future != null && future.isDone() && !future.isCancelled()) {
- return getDone(future);
+ Token token = tokens.remove(key);
+ if (token != null) {
+ return dataCacheMap.remove(token);
}
return null;
}
@@ -276,33 +324,106 @@ public void putAll(Map extends K, ? extends V> m)
@Override
public void clear()
{
- delegate.clear();
+ dataCacheMap.clear();
+ tokens.clear();
}
@Override
public Set keySet()
{
- return delegate.keySet();
+ return tokens.keySet();
}
@Override
public Collection values()
{
- // values() should be a view, but also, it has a size and, iterating values shouldn't throw for incomplete futures.
- throw new UnsupportedOperationException();
+ return dataCacheMap.values();
}
@Override
- public Set> entrySet()
+ public Set> entrySet()
{
throw new UnsupportedOperationException();
}
};
}
- @Override
- public void cleanUp()
+ // instance-based equality
+ static final class Token
+ {
+ private final K key;
+
+ Token(K key)
+ {
+ this.key = requireNonNull(key, "key is null");
+ }
+
+ K getKey()
+ {
+ return key;
+ }
+
+ @Override
+ public String toString()
+ {
+ return format("CacheToken(%s; %s)", Integer.toHexString(hashCode()), key);
+ }
+ }
+
+ private static class TokenCacheLoader
+ extends CacheLoader, V>
{
- delegate.cleanUp();
+ private final CacheLoader super K, V> delegate;
+
+ public TokenCacheLoader(CacheLoader super K, V> delegate)
+ {
+ this.delegate = requireNonNull(delegate, "delegate is null");
+ }
+
+ @Override
+ public V load(Token token)
+ throws Exception
+ {
+ return delegate.load(token.getKey());
+ }
+
+ @Override
+ public ListenableFuture reload(Token token, V oldValue)
+ throws Exception
+ {
+ return delegate.reload(token.getKey(), oldValue);
+ }
+
+ @Override
+ public Map, V> loadAll(Iterable extends Token> tokens)
+ throws Exception
+ {
+ List> tokenList = ImmutableList.copyOf(tokens);
+ List keys = new ArrayList<>();
+ for (Token token : tokenList) {
+ keys.add(token.getKey());
+ }
+ Map super K, V> values = delegate.loadAll(keys);
+
+ ImmutableMap.Builder, V> result = ImmutableMap.builder();
+ for (int i = 0; i < tokenList.size(); i++) {
+ Token token = tokenList.get(i);
+ K key = keys.get(i);
+ V value = values.get(key);
+ // CacheLoader.loadAll is not guaranteed to return values for all the keys
+ if (value != null) {
+ result.put(token, value);
+ }
+ }
+ return result.buildOrThrow();
+ }
+
+ @Override
+ public String toString()
+ {
+ return toStringHelper(this)
+ .addValue(delegate)
+ .toString();
+ }
}
}
diff --git a/lib/trino-collect/src/main/java/io/trino/collect/cache/EvictableCacheBuilder.java b/lib/trino-collect/src/main/java/io/trino/collect/cache/EvictableCacheBuilder.java
new file mode 100644
index 000000000000..fbc4dc39be21
--- /dev/null
+++ b/lib/trino-collect/src/main/java/io/trino/collect/cache/EvictableCacheBuilder.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.collect.cache;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.Weigher;
+import com.google.errorprone.annotations.CheckReturnValue;
+import io.trino.collect.cache.EvictableCache.Token;
+import org.gaul.modernizer_maven_annotations.SuppressModernizer;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * Builder for {@link Cache} and {@link LoadingCache} instances, similar to {@link CacheBuilder},
+ * but creating cache implementations that do not exhibit Guava issue #1881:
+ * a cache inspection with {@link Cache#getIfPresent(Object)} or {@link Cache#get(Object, Callable)} is guaranteed to return
+ * fresh state after {@link Cache#invalidate(Object)}, {@link Cache#invalidateAll(Iterable)} or {@link Cache#invalidateAll()} were called.
+ */
+public final class EvictableCacheBuilder
+{
+ @CheckReturnValue
+ public static EvictableCacheBuilder