Skip to content

Commit

Permalink
Add a blob existence cache to RemoteCache
Browse files Browse the repository at this point in the history
Bazel currently performs a full findMissingBlob call for every input of every
action that is not an immediate cache hit. This causes a significant amount
of network traffic as well as server load (unless the server caches blob
existence as well).

In particular, for a build of TensorFlow w/ remote execution over a slow-ish
network, we see multi-minute delays on actions due to these findMissingBlob
calls, and the client is not able to saturate a 50-machine cluster.

This change carefully de-duplicates findMissingBlob calls as well as file
uploads to the remote execution cluster. It is based on a previous simpler
change that did not de-duplicate *concurrent* calls. However, we have found
that concurrent calls are quite common - i.e., an action completes that
unblocks a large number of actions that have significant overlaps in their
input sets (e.g., protoc link completion unblocks all protoc compile
actions), and we were still seeing multi-minute delays on action execution.

With this change, a TensorFlow build w/ remote execution is down to ~20
minutes on a 50-machine cluster, and is able to fully saturate the cluster
(apart from an issue with connection loss, see PR bazelbuild#11957).

Change-Id: Ic39347a7a7a8dc7cfd463d78f0a80e0d26a970bc
  • Loading branch information
ulfjack committed May 26, 2021
1 parent c4975ef commit 07a221a
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
Expand All @@ -31,11 +32,14 @@
import com.google.protobuf.Message;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/** A {@link RemoteCache} with additional functionality needed for remote execution. */
public class RemoteExecutionCache extends RemoteCache {
private final ConcurrentHashMap<Digest, ListenableFuture<Void>> uploadFutures = new ConcurrentHashMap<>();

public RemoteExecutionCache(
RemoteCacheClient protocolImpl, RemoteOptions options, DigestUtil digestUtil) {
Expand All @@ -53,23 +57,89 @@ public RemoteExecutionCache(
* <p>Note that this method is only required for remote execution, not for caching itself.
* However, remote execution uses a cache to store input files, and that may be a separate
* end-point from the executor itself, so the functionality lives here.
*
* <p>Callers should pass {@code true} for {@code checkAll} on retries as a precaution to avoid
* getting stale results from other threads.
*/
public void ensureInputsPresent(
RemoteActionExecutionContext context,
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs)
RemoteActionExecutionContext context, MerkleTree merkleTree, Map<Digest, Message> additionalInputs, boolean checkAll)
throws IOException, InterruptedException {
Iterable<Digest> allDigests =
Iterables.concat(merkleTree.getAllDigests(), additionalInputs.keySet());
ImmutableSet<Digest> missingDigests =
getFromFuture(cacheProtocol.findMissingDigests(context, allDigests));
List<ListenableFuture<Void>> futures = new ArrayList<>();

List<ListenableFuture<Void>> uploadFutures = new ArrayList<>();
for (Digest missingDigest : missingDigests) {
uploadFutures.add(uploadBlob(context, missingDigest, merkleTree, additionalInputs));
if (checkAll || !options.experimentalRemoteDeduplicateUploads) {
ImmutableSet<Digest> missingDigests =
getFromFuture(cacheProtocol.findMissingDigests(context, allDigests));
for (Digest missingDigest : missingDigests) {
futures.add(uploadBlob(context, missingDigest, merkleTree, additionalInputs));
}
} else {
// This code deduplicates findMissingDigests calls as well as file uploads. It works by
// distinguishing owned vs. unowned futures. Owned futures are owned by the current thread,
// i.e., the current thread was the first thread to add a future to the uploadFutures map.
// Unowned futures are those which another thread added first.
// This thread completes all owned futures, and also waits for all unowned futures (some or
// all of which may already be completed).
// Note that we add all futures (both owned and unowned) to the futures list, so we only need
// a single waitForBulkTransfer call below.
Map<Digest, SettableFuture<Void>> ownedFutures = new HashMap<>();
for (Digest d : allDigests) {
// We expect the majority of digests to already have an entry in the map. Therefore, we
// check the map first *before* we create a SettableFuture instance (to avoid unnecessary
// gc for those cases). Unfortunately, we cannot use computeIfAbsent here because we need to
// know if this thread owns the future or not.
ListenableFuture<Void> future = uploadFutures.get(d);
if (future == null) {
SettableFuture<Void> settableFuture = SettableFuture.create();
ListenableFuture<Void> previous = uploadFutures.putIfAbsent(d, settableFuture);
if (previous != null) {
// We raced and we lost.
future = previous;
} else {
ownedFutures.put(d, settableFuture);
future = settableFuture;
}
}
futures.add(future);
}

// Call findMissingDigests for all owned digests.
ImmutableSet<Digest> missingDigests;
try {
missingDigests =
getFromFuture(cacheProtocol.findMissingDigests(context, ownedFutures.keySet()));
} catch (IOException | RuntimeException e) {
// If something goes wrong in the findMissingDigests call, we need to complete the owned
// futures, otherwise other threads may hang.
for (SettableFuture<Void> future : ownedFutures.values()) {
future.setException(e);
}
throw e;
} catch (InterruptedException e) {
for (SettableFuture<Void> future : ownedFutures.values()) {
future.cancel(false);
}
throw e;
}

for (Map.Entry<Digest, SettableFuture<Void>> e : ownedFutures.entrySet()) {
Digest digest = e.getKey();
SettableFuture<Void> future = e.getValue();
if (missingDigests.contains(digest)) {
// Upload if the digest is missing from the remote cache.
ListenableFuture<Void> uploadFuture =
uploadBlob(context, digest, merkleTree, additionalInputs);
future.setFuture(uploadFuture);
} else {
// We need to complete *all* futures we own, including when they are actually present in
// the remote cache.
future.set(null);
}
}
}

waitForBulkTransfer(uploadFutures, /* cancelRemainingOnInterrupt=*/ false);
waitForBulkTransfer(futures, /* cancelRemainingOnInterrupt=*/ false);
}

private ListenableFuture<Void> uploadBlob(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public ExecutionResult execute(
additionalInputs.put(actionDigest, action);
additionalInputs.put(commandHash, command);

remoteCache.ensureInputsPresent(context, merkleTree, additionalInputs);
remoteCache.ensureInputsPresent(context, merkleTree, additionalInputs, true);
}

try (SilentCloseable c =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ public SpawnResult exec(Spawn spawn, SpawnExecutionContext context)
requestBuilder.getExecutionPolicyBuilder().setPriority(remoteOptions.remoteExecutionPriority);
}
try {
AtomicBoolean isRetry = new AtomicBoolean();
return retrier.execute(
() -> {
ExecuteRequest request = requestBuilder.build();
Expand All @@ -333,7 +334,7 @@ public SpawnResult exec(Spawn spawn, SpawnExecutionContext context)
remoteActionExecutionContext.getNetworkTime().getDuration();
Stopwatch uploadTime = Stopwatch.createStarted();
remoteCache.ensureInputsPresent(
remoteActionExecutionContext, merkleTree, additionalInputs);
remoteActionExecutionContext, merkleTree, additionalInputs, isRetry.getAndSet(true));
// subtract network time consumed here to ensure wall clock during upload is not
// double
// counted, and metrics time computation does not exceed total time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,17 @@ public RemoteOutputsStrategyConverter() {
+ "that loads objects from the CAS on demand.")
public String remoteDownloadSymlinkTemplate;

@Option(
name = "experimental_remote_deduplicate_uploads",
defaultValue = "false",
category = "remote",
documentationCategory = OptionDocumentationCategory.REMOTE,
effectTags = {OptionEffectTag.EXECUTION},
help =
"If set to true, Bazel deduplicates calls to the remote service to find missing "
+ "files and also deduplicates uploads.")
public boolean experimentalRemoteDeduplicateUploads;

// The below options are not configurable by users, only tests.
// This is part of the effort to reduce the overall number of flags.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ public void onError(Throwable t) {
});

// Upload all missing inputs (that is, the virtual action input from above)
client.ensureInputsPresent(context, merkleTree, ImmutableMap.of());
client.ensureInputsPresent(context, merkleTree, ImmutableMap.of(), true);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static java.nio.charset.StandardCharsets.ISO_8859_1;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyCollection;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
Expand Down Expand Up @@ -255,7 +256,7 @@ public void nonCachableSpawnsShouldNotBeCached_localFallback() throws Exception
runner.exec(spawn, policy);

verify(localRunner).exec(spawn, policy);
verify(cache).ensureInputsPresent(any(), any(), any());
verify(cache).ensureInputsPresent(any(), any(), any(), anyBoolean());
verifyNoMoreInteractions(cache);
}

Expand Down

0 comments on commit 07a221a

Please sign in to comment.