diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java index 4f567c13cc5802..6c6000ccb34c89 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java @@ -38,7 +38,6 @@ import com.google.common.collect.Iterables; import com.google.common.hash.HashCode; import com.google.common.hash.HashingOutputStream; -import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; @@ -246,10 +245,6 @@ public ListenableFuture<ActionResult> downloadActionResult( () -> ctx.call(() -> handleStatus(acFutureStub().getActionResult(request)))); } - private static String digestToString(Digest digest) { - return digest.getHash() + "/" + digest.getSizeBytes(); - } - @Override public void uploadActionResult(ActionKey actionKey, ActionResult actionResult) throws IOException, InterruptedException { @@ -273,11 +268,6 @@ public ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) { if (digest.getSizeBytes() == 0) { return Futures.immediateFuture(null); } - String resourceName = ""; - if (!options.remoteInstanceName.isEmpty()) { - resourceName += options.remoteInstanceName + "/"; - } - resourceName += "blobs/" + digestToString(digest); @Nullable Supplier<HashCode> hashSupplier = null; if (options.remoteVerifyDownloads) { @@ -286,29 +276,10 @@ public ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) { out = hashOut; } - SettableFuture<Void> outerF = SettableFuture.create(); - Futures.addCallback( - downloadBlob(resourceName, digest, out, hashSupplier), - new FutureCallback<Void>() { - @Override - public void onSuccess(Void result) { - outerF.set(null); - } - - @Override - public void onFailure(Throwable t) { - if (t instanceof StatusRuntimeException) { - t = new IOException(t); - } - outerF.setException(t); - } - }, - Context.current().fixedContextExecutor(MoreExecutors.directExecutor())); - return outerF; + return downloadBlob(digest, out, hashSupplier); } private ListenableFuture<Void> downloadBlob( - String resourceName, Digest digest, OutputStream out, @Nullable Supplier<HashCode> hashSupplier) { @@ -318,23 +289,28 @@ private ListenableFuture<Void> downloadBlob( return Futures.catchingAsync( retrier.executeAsync( () -> - ctx.call( - () -> - requestRead( - resourceName, offset, progressiveBackoff, digest, out, hashSupplier)), + ctx.call(() -> requestRead(offset, progressiveBackoff, digest, out, hashSupplier)), progressiveBackoff), StatusRuntimeException.class, (e) -> Futures.immediateFailedFuture(new IOException(e)), MoreExecutors.directExecutor()); } + public static String getResourceName(String instanceName, Digest digest) { + String resourceName = ""; + if (!instanceName.isEmpty()) { + resourceName += instanceName + "/"; + } + return resourceName + "blobs/" + DigestUtil.toString(digest); + } + private ListenableFuture<Void> requestRead( - String resourceName, AtomicLong offset, ProgressiveBackoff progressiveBackoff, Digest digest, OutputStream out, @Nullable Supplier<HashCode> hashSupplier) { + String resourceName = getResourceName(options.remoteInstanceName, digest); SettableFuture<Void> future = SettableFuture.create(); bsAsyncStub() .read( diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java index caba306cbe576d..e104671786de78 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java @@ -213,7 +213,7 @@ protected static <T> void waitForBulkTransfer( for (ListenableFuture<T> transfer : transfers) { try { if (interruptedException == null) { - // Wait for all downloads to finish. + // Wait for all transfers to finish. getFromFuture(transfer); } else { transfer.cancel(true); @@ -680,6 +680,8 @@ private ActionResultMetadata parseActionResultMetadata(ActionResult actionResult directExecutor())); } + waitForBulkTransfer(dirMetadataDownloads.values(), /* cancelRemainingOnInterrupt=*/ true); + ImmutableMap.Builder<Path, DirectoryMetadata> directories = ImmutableMap.builder(); for (Map.Entry<Path, ListenableFuture<Tree>> metadataDownload : dirMetadataDownloads.entrySet()) { diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java index abf317015487b8..c6b6357cb47550 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java +++ b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java @@ -55,16 +55,17 @@ public static <T> T getFromFuture(ListenableFuture<T> f) try { return f.get(); } catch (ExecutionException e) { - if (e.getCause() instanceof InterruptedException) { - throw (InterruptedException) e.getCause(); + Throwable cause = e.getCause(); + if (cause instanceof InterruptedException) { + throw (InterruptedException) cause; } - if (e.getCause() instanceof IOException) { - throw (IOException) e.getCause(); + if (cause instanceof IOException) { + throw (IOException) cause; } - if (e.getCause() instanceof RuntimeException) { - throw (RuntimeException) e.getCause(); + if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; } - throw new IOException(e.getCause()); + throw new IOException(cause); } } diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java index 737e62b4e4c80e..eec57b51f5c73d 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java @@ -14,6 +14,7 @@ package com.google.devtools.build.lib.remote; import static com.google.common.truth.Truth.assertThat; +import static com.google.devtools.build.lib.remote.GrpcCacheClient.getResourceName; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.Assert.fail; import static org.mockito.AdditionalAnswers.answerVoid; @@ -27,14 +28,18 @@ import build.bazel.remote.execution.v2.Command; import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase; import build.bazel.remote.execution.v2.Digest; +import build.bazel.remote.execution.v2.Directory; import build.bazel.remote.execution.v2.ExecuteRequest; import build.bazel.remote.execution.v2.ExecuteResponse; import build.bazel.remote.execution.v2.ExecutionGrpc.ExecutionImplBase; +import build.bazel.remote.execution.v2.FileNode; import build.bazel.remote.execution.v2.FindMissingBlobsRequest; import build.bazel.remote.execution.v2.FindMissingBlobsResponse; import build.bazel.remote.execution.v2.GetActionResultRequest; +import build.bazel.remote.execution.v2.OutputDirectory; import build.bazel.remote.execution.v2.OutputFile; import build.bazel.remote.execution.v2.RequestMetadata; +import build.bazel.remote.execution.v2.Tree; import build.bazel.remote.execution.v2.WaitExecutionRequest; import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase; import com.google.bytestream.ByteStreamProto.QueryWriteStatusRequest; @@ -127,6 +132,7 @@ public class GrpcRemoteExecutionClientTest { private Command command; private RemoteSpawnRunner client; private FileOutErr outErr; + private RemoteOptions remoteOptions; private Server fakeServer; private ListeningScheduledExecutorService retryService; @@ -140,6 +146,25 @@ public class GrpcRemoteExecutionClientTest { .build()) .build(); + private static final Tree DUMMY_OUTPUT_TREE = + Tree.newBuilder() + .setRoot( + Directory.newBuilder() + .addFiles( + FileNode.newBuilder() + .setName(DUMMY_OUTPUT.getPath()) + .setDigest(DUMMY_OUTPUT.getDigest()) + .setIsExecutable(true) + .build()) + .build()) + .build(); + + private static final OutputDirectory DUMMY_OUTPUT_DIRECTORY = + OutputDirectory.newBuilder() + .setPath("dummy") + .setTreeDigest(DIGEST_UTIL.compute(DUMMY_OUTPUT_TREE)) + .build(); + @Before public final void setUp() throws Exception { String fakeServerName = "fake server for " + getClass(); @@ -205,7 +230,7 @@ public PathFragment getExecPath() { FileSystemUtils.createDirectoryAndParents(stdout.getParentDirectory()); FileSystemUtils.createDirectoryAndParents(stderr.getParentDirectory()); outErr = new FileOutErr(stdout, stderr); - RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); + remoteOptions = Options.getDefaults(RemoteOptions.class); remoteOptions.remoteHeaders = ImmutableList.of( @@ -1011,12 +1036,12 @@ public void findMissingBlobs( responseObserver.onCompleted(); } }); + String stdOutResourceName = getResourceName(remoteOptions.remoteInstanceName, stdOutDigest); serviceRegistry.addService( new ByteStreamImplBase() { @Override public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) { - assertThat(request.getResourceName().contains(DigestUtil.toString(stdOutDigest))) - .isTrue(); + assertThat(request.getResourceName()).isEqualTo(stdOutResourceName); responseObserver.onError(Status.NOT_FOUND.asRuntimeException()); } }); @@ -1072,12 +1097,12 @@ public void findMissingBlobs( responseObserver.onCompleted(); } }); + String stdOutResourceName = getResourceName(remoteOptions.remoteInstanceName, stdOutDigest); serviceRegistry.addService( new ByteStreamImplBase() { @Override public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) { - assertThat(request.getResourceName().contains(DigestUtil.toString(stdOutDigest))) - .isTrue(); + assertThat(request.getResourceName()).isEqualTo(stdOutResourceName); responseObserver.onError(Status.NOT_FOUND.asRuntimeException()); } }); @@ -1184,6 +1209,100 @@ public void findMissingBlobs( assertThat(numExecuteCalls.get()).isEqualTo(1); } + @Test + public void remotelyReExecuteOrphanedDirectoryCachedActions() throws Exception { + final ActionResult actionResult = + ActionResult.newBuilder().addOutputDirectories(DUMMY_OUTPUT_DIRECTORY).build(); + serviceRegistry.addService( + new ActionCacheImplBase() { + @Override + public void getActionResult( + GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) { + responseObserver.onNext(actionResult); + responseObserver.onCompleted(); + } + }); + String dummyTreeResourceName = + getResourceName(remoteOptions.remoteInstanceName, DUMMY_OUTPUT_DIRECTORY.getTreeDigest()); + serviceRegistry.addService( + new ByteStreamImplBase() { + private boolean first = true; + + @Override + public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) { + String resourceName = request.getResourceName(); + if (resourceName.equals(dummyTreeResourceName)) { + // First read is a cache miss, next read succeeds. + if (first) { + first = false; + responseObserver.onError(Status.NOT_FOUND.asRuntimeException()); + } else { + responseObserver.onNext( + ReadResponse.newBuilder().setData(DUMMY_OUTPUT_TREE.toByteString()).build()); + responseObserver.onCompleted(); + } + } else { + responseObserver.onNext(ReadResponse.getDefaultInstance()); + } + } + + @Override + public StreamObserver<WriteRequest> write( + StreamObserver<WriteResponse> responseObserver) { + return new StreamObserver<WriteRequest>() { + @Override + public void onNext(WriteRequest request) {} + + @Override + public void onCompleted() { + responseObserver.onCompleted(); + } + + @Override + public void onError(Throwable t) { + fail("An error occurred: " + t); + } + }; + } + }); + AtomicInteger numExecuteCalls = new AtomicInteger(); + serviceRegistry.addService( + new ExecutionImplBase() { + @Override + public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) { + numExecuteCalls.incrementAndGet(); + assertThat(request.getSkipCacheLookup()).isTrue(); // Action will be re-executed. + responseObserver.onNext( + Operation.newBuilder() + .setDone(true) + .setResponse( + Any.pack(ExecuteResponse.newBuilder().setResult(actionResult).build())) + .build()); + responseObserver.onCompleted(); + } + }); + serviceRegistry.addService( + new ContentAddressableStorageImplBase() { + @Override + public void findMissingBlobs( + FindMissingBlobsRequest request, + StreamObserver<FindMissingBlobsResponse> responseObserver) { + // Nothing is missing. + responseObserver.onNext(FindMissingBlobsResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + }); + + FakeSpawnExecutionContext policy = + new FakeSpawnExecutionContext(simpleSpawn, fakeFileCache, execRoot, outErr); + + SpawnResult result = client.exec(simpleSpawn, policy); + assertThat(result.setupSuccess()).isTrue(); + assertThat(result.exitCode()).isEqualTo(0); + assertThat(result.isCacheHit()).isFalse(); + assertThat(numExecuteCalls.get()).isEqualTo(1); + } + @Test public void retryUploadAndExecuteOnMissingInputs() throws Exception { serviceRegistry.addService( diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java index 3db0bf5f7dfe6b..af617f49a95cbc 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java @@ -1045,9 +1045,9 @@ public void testDownloadMinimalDirectoryFails() throws Exception { MetadataInjector injector = mock(MetadataInjector.class); // act - IOException e = + BulkTransferException e = assertThrows( - IOException.class, + BulkTransferException.class, () -> remoteCache.downloadMinimal( r, @@ -1057,7 +1057,8 @@ public void testDownloadMinimalDirectoryFails() throws Exception { execRoot, injector, outputFilesLocker)); - assertThat(e).isEqualTo(downloadTreeException); + assertThat(e.getSuppressed()).hasLength(1); + assertThat(e.getSuppressed()[0]).isEqualTo(downloadTreeException); verify(outputFilesLocker, never()).lock(); }