Skip to content

Commit

Permalink
Deduplicate concurrent computations of the same Merkle tree.
Browse files Browse the repository at this point in the history
Currently, it's possible for concurrent actions to end up computing the same Merkle tree, even when the cache is enabled. This change makes it so that a later action waits for the completion of the computation started by an earlier action.

Progress on #17923.

Closes #17995.

PiperOrigin-RevId: 522319291
Change-Id: I68ab952ed6357027ec71a67a104f91a684a7a040
  • Loading branch information
tjgq authored and copybara-github committed Apr 6, 2023
1 parent 8e359e7 commit 6d6fa81
Showing 1 changed file with 31 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Phaser;
Expand All @@ -168,7 +170,7 @@ public class RemoteExecutionService {
@Nullable private final RemoteExecutionClient remoteExecutor;
private final TempPathGenerator tempPathGenerator;
@Nullable private final Path captureCorruptedOutputsDir;
private final Cache<Object, MerkleTree> merkleTreeCache;
private final Cache<Object, CompletableFuture<MerkleTree>> merkleTreeCache;
private final Set<String> reportedErrors = new HashSet<>();
private final Phaser backgroundTaskPhaser = new Phaser(1);

Expand Down Expand Up @@ -344,7 +346,7 @@ public boolean mayBeExecutedRemotely(Spawn spawn) {
}

@VisibleForTesting
Cache<Object, MerkleTree> getMerkleTreeCache() {
Cache<Object, CompletableFuture<MerkleTree>> getMerkleTreeCache() {
return merkleTreeCache;
}

Expand Down Expand Up @@ -418,12 +420,34 @@ private MerkleTree buildMerkleTreeVisitor(
MetadataProvider metadataProvider,
ArtifactPathResolver artifactPathResolver)
throws IOException, ForbiddenActionInputException {
MerkleTree result = merkleTreeCache.getIfPresent(nodeKey);
if (result == null) {
result = uncachedBuildMerkleTreeVisitor(walker, metadataProvider, artifactPathResolver);
merkleTreeCache.put(nodeKey, result);
// Deduplicate concurrent computations for the same node. It's not possible to use
// MerkleTreeCache#get(key, loader) because the loading computation may cause other nodes to be
// recursively looked up, which is not allowed. Instead, use a future as described at
// https://github.com/ben-manes/caffeine/wiki/Faq#recursive-computations.
var freshFuture = new CompletableFuture<MerkleTree>();
var priorFuture = merkleTreeCache.asMap().putIfAbsent(nodeKey, freshFuture);
if (priorFuture == null) {
// No preexisting cache entry, so we must do the computation ourselves.
try {
freshFuture.complete(
uncachedBuildMerkleTreeVisitor(walker, metadataProvider, artifactPathResolver));
} catch (Exception e) {
freshFuture.completeExceptionally(e);
}
}
try {
return (priorFuture != null ? priorFuture : freshFuture).join();
} catch (CompletionException e) {
Throwable cause = checkNotNull(e.getCause());
if (cause instanceof IOException) {
throw (IOException) cause;
} else if (cause instanceof ForbiddenActionInputException) {
throw (ForbiddenActionInputException) cause;
} else {
checkState(cause instanceof RuntimeException);
throw (RuntimeException) cause;
}
}
return result;
}

@VisibleForTesting
Expand Down

0 comments on commit 6d6fa81

Please sign in to comment.