Skip to content

Commit

Permalink
Guard parseActionResultMetadata with bulk wrapper
Browse files Browse the repository at this point in the history
ActionResult Orphaned Output Directories must be wrapped in
BulkTransferExceptions in order to be eligible for re-execution. This
change prevents an unavoidable build failure introduced with #10029 in
the event that an output directory tree specified in an action result
is missing from the CAS, and adds testing that would detect such a
regression.

Closes #11140.

PiperOrigin-RevId: 307027914
  • Loading branch information
George Gensure authored and copybara-github committed Apr 17, 2020
1 parent 266e5ff commit 24f97e1
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import com.google.common.collect.Iterables;
import com.google.common.hash.HashCode;
import com.google.common.hash.HashingOutputStream;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
Expand Down Expand Up @@ -246,10 +245,6 @@ public ListenableFuture<ActionResult> downloadActionResult(
() -> ctx.call(() -> handleStatus(acFutureStub().getActionResult(request))));
}

private static String digestToString(Digest digest) {
return digest.getHash() + "/" + digest.getSizeBytes();
}

@Override
public void uploadActionResult(ActionKey actionKey, ActionResult actionResult)
throws IOException, InterruptedException {
Expand All @@ -273,11 +268,6 @@ public ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
if (digest.getSizeBytes() == 0) {
return Futures.immediateFuture(null);
}
String resourceName = "";
if (!options.remoteInstanceName.isEmpty()) {
resourceName += options.remoteInstanceName + "/";
}
resourceName += "blobs/" + digestToString(digest);

@Nullable Supplier<HashCode> hashSupplier = null;
if (options.remoteVerifyDownloads) {
Expand All @@ -286,29 +276,10 @@ public ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
out = hashOut;
}

SettableFuture<Void> outerF = SettableFuture.create();
Futures.addCallback(
downloadBlob(resourceName, digest, out, hashSupplier),
new FutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
outerF.set(null);
}

@Override
public void onFailure(Throwable t) {
if (t instanceof StatusRuntimeException) {
t = new IOException(t);
}
outerF.setException(t);
}
},
Context.current().fixedContextExecutor(MoreExecutors.directExecutor()));
return outerF;
return downloadBlob(digest, out, hashSupplier);
}

private ListenableFuture<Void> downloadBlob(
String resourceName,
Digest digest,
OutputStream out,
@Nullable Supplier<HashCode> hashSupplier) {
Expand All @@ -318,23 +289,28 @@ private ListenableFuture<Void> downloadBlob(
return Futures.catchingAsync(
retrier.executeAsync(
() ->
ctx.call(
() ->
requestRead(
resourceName, offset, progressiveBackoff, digest, out, hashSupplier)),
ctx.call(() -> requestRead(offset, progressiveBackoff, digest, out, hashSupplier)),
progressiveBackoff),
StatusRuntimeException.class,
(e) -> Futures.immediateFailedFuture(new IOException(e)),
MoreExecutors.directExecutor());
}

public static String getResourceName(String instanceName, Digest digest) {
String resourceName = "";
if (!instanceName.isEmpty()) {
resourceName += instanceName + "/";
}
return resourceName + "blobs/" + DigestUtil.toString(digest);
}

private ListenableFuture<Void> requestRead(
String resourceName,
AtomicLong offset,
ProgressiveBackoff progressiveBackoff,
Digest digest,
OutputStream out,
@Nullable Supplier<HashCode> hashSupplier) {
String resourceName = getResourceName(options.remoteInstanceName, digest);
SettableFuture<Void> future = SettableFuture.create();
bsAsyncStub()
.read(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ protected static <T> void waitForBulkTransfer(
for (ListenableFuture<T> transfer : transfers) {
try {
if (interruptedException == null) {
// Wait for all downloads to finish.
// Wait for all transfers to finish.
getFromFuture(transfer);
} else {
transfer.cancel(true);
Expand Down Expand Up @@ -680,6 +680,8 @@ private ActionResultMetadata parseActionResultMetadata(ActionResult actionResult
directExecutor()));
}

waitForBulkTransfer(dirMetadataDownloads.values(), /* cancelRemainingOnInterrupt=*/ true);

ImmutableMap.Builder<Path, DirectoryMetadata> directories = ImmutableMap.builder();
for (Map.Entry<Path, ListenableFuture<Tree>> metadataDownload :
dirMetadataDownloads.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,17 @@ public static <T> T getFromFuture(ListenableFuture<T> f)
try {
return f.get();
} catch (ExecutionException e) {
if (e.getCause() instanceof InterruptedException) {
throw (InterruptedException) e.getCause();
Throwable cause = e.getCause();
if (cause instanceof InterruptedException) {
throw (InterruptedException) cause;
}
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
if (cause instanceof IOException) {
throw (IOException) cause;
}
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
throw new IOException(e.getCause());
throw new IOException(cause);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.google.devtools.build.lib.remote;

import static com.google.common.truth.Truth.assertThat;
import static com.google.devtools.build.lib.remote.GrpcCacheClient.getResourceName;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.fail;
import static org.mockito.AdditionalAnswers.answerVoid;
Expand All @@ -27,14 +28,18 @@
import build.bazel.remote.execution.v2.Command;
import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase;
import build.bazel.remote.execution.v2.Digest;
import build.bazel.remote.execution.v2.Directory;
import build.bazel.remote.execution.v2.ExecuteRequest;
import build.bazel.remote.execution.v2.ExecuteResponse;
import build.bazel.remote.execution.v2.ExecutionGrpc.ExecutionImplBase;
import build.bazel.remote.execution.v2.FileNode;
import build.bazel.remote.execution.v2.FindMissingBlobsRequest;
import build.bazel.remote.execution.v2.FindMissingBlobsResponse;
import build.bazel.remote.execution.v2.GetActionResultRequest;
import build.bazel.remote.execution.v2.OutputDirectory;
import build.bazel.remote.execution.v2.OutputFile;
import build.bazel.remote.execution.v2.RequestMetadata;
import build.bazel.remote.execution.v2.Tree;
import build.bazel.remote.execution.v2.WaitExecutionRequest;
import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
import com.google.bytestream.ByteStreamProto.QueryWriteStatusRequest;
Expand Down Expand Up @@ -127,6 +132,7 @@ public class GrpcRemoteExecutionClientTest {
private Command command;
private RemoteSpawnRunner client;
private FileOutErr outErr;
private RemoteOptions remoteOptions;
private Server fakeServer;
private ListeningScheduledExecutorService retryService;

Expand All @@ -140,6 +146,25 @@ public class GrpcRemoteExecutionClientTest {
.build())
.build();

private static final Tree DUMMY_OUTPUT_TREE =
Tree.newBuilder()
.setRoot(
Directory.newBuilder()
.addFiles(
FileNode.newBuilder()
.setName(DUMMY_OUTPUT.getPath())
.setDigest(DUMMY_OUTPUT.getDigest())
.setIsExecutable(true)
.build())
.build())
.build();

private static final OutputDirectory DUMMY_OUTPUT_DIRECTORY =
OutputDirectory.newBuilder()
.setPath("dummy")
.setTreeDigest(DIGEST_UTIL.compute(DUMMY_OUTPUT_TREE))
.build();

@Before
public final void setUp() throws Exception {
String fakeServerName = "fake server for " + getClass();
Expand Down Expand Up @@ -205,7 +230,7 @@ public PathFragment getExecPath() {
FileSystemUtils.createDirectoryAndParents(stdout.getParentDirectory());
FileSystemUtils.createDirectoryAndParents(stderr.getParentDirectory());
outErr = new FileOutErr(stdout, stderr);
RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class);
remoteOptions = Options.getDefaults(RemoteOptions.class);

remoteOptions.remoteHeaders =
ImmutableList.of(
Expand Down Expand Up @@ -1011,12 +1036,12 @@ public void findMissingBlobs(
responseObserver.onCompleted();
}
});
String stdOutResourceName = getResourceName(remoteOptions.remoteInstanceName, stdOutDigest);
serviceRegistry.addService(
new ByteStreamImplBase() {
@Override
public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
assertThat(request.getResourceName().contains(DigestUtil.toString(stdOutDigest)))
.isTrue();
assertThat(request.getResourceName()).isEqualTo(stdOutResourceName);
responseObserver.onError(Status.NOT_FOUND.asRuntimeException());
}
});
Expand Down Expand Up @@ -1072,12 +1097,12 @@ public void findMissingBlobs(
responseObserver.onCompleted();
}
});
String stdOutResourceName = getResourceName(remoteOptions.remoteInstanceName, stdOutDigest);
serviceRegistry.addService(
new ByteStreamImplBase() {
@Override
public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
assertThat(request.getResourceName().contains(DigestUtil.toString(stdOutDigest)))
.isTrue();
assertThat(request.getResourceName()).isEqualTo(stdOutResourceName);
responseObserver.onError(Status.NOT_FOUND.asRuntimeException());
}
});
Expand Down Expand Up @@ -1184,6 +1209,100 @@ public void findMissingBlobs(
assertThat(numExecuteCalls.get()).isEqualTo(1);
}

@Test
public void remotelyReExecuteOrphanedDirectoryCachedActions() throws Exception {
final ActionResult actionResult =
ActionResult.newBuilder().addOutputDirectories(DUMMY_OUTPUT_DIRECTORY).build();
serviceRegistry.addService(
new ActionCacheImplBase() {
@Override
public void getActionResult(
GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) {
responseObserver.onNext(actionResult);
responseObserver.onCompleted();
}
});
String dummyTreeResourceName =
getResourceName(remoteOptions.remoteInstanceName, DUMMY_OUTPUT_DIRECTORY.getTreeDigest());
serviceRegistry.addService(
new ByteStreamImplBase() {
private boolean first = true;

@Override
public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
String resourceName = request.getResourceName();
if (resourceName.equals(dummyTreeResourceName)) {
// First read is a cache miss, next read succeeds.
if (first) {
first = false;
responseObserver.onError(Status.NOT_FOUND.asRuntimeException());
} else {
responseObserver.onNext(
ReadResponse.newBuilder().setData(DUMMY_OUTPUT_TREE.toByteString()).build());
responseObserver.onCompleted();
}
} else {
responseObserver.onNext(ReadResponse.getDefaultInstance());
}
}

@Override
public StreamObserver<WriteRequest> write(
StreamObserver<WriteResponse> responseObserver) {
return new StreamObserver<WriteRequest>() {
@Override
public void onNext(WriteRequest request) {}

@Override
public void onCompleted() {
responseObserver.onCompleted();
}

@Override
public void onError(Throwable t) {
fail("An error occurred: " + t);
}
};
}
});
AtomicInteger numExecuteCalls = new AtomicInteger();
serviceRegistry.addService(
new ExecutionImplBase() {
@Override
public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) {
numExecuteCalls.incrementAndGet();
assertThat(request.getSkipCacheLookup()).isTrue(); // Action will be re-executed.
responseObserver.onNext(
Operation.newBuilder()
.setDone(true)
.setResponse(
Any.pack(ExecuteResponse.newBuilder().setResult(actionResult).build()))
.build());
responseObserver.onCompleted();
}
});
serviceRegistry.addService(
new ContentAddressableStorageImplBase() {
@Override
public void findMissingBlobs(
FindMissingBlobsRequest request,
StreamObserver<FindMissingBlobsResponse> responseObserver) {
// Nothing is missing.
responseObserver.onNext(FindMissingBlobsResponse.getDefaultInstance());
responseObserver.onCompleted();
}
});

FakeSpawnExecutionContext policy =
new FakeSpawnExecutionContext(simpleSpawn, fakeFileCache, execRoot, outErr);

SpawnResult result = client.exec(simpleSpawn, policy);
assertThat(result.setupSuccess()).isTrue();
assertThat(result.exitCode()).isEqualTo(0);
assertThat(result.isCacheHit()).isFalse();
assertThat(numExecuteCalls.get()).isEqualTo(1);
}

@Test
public void retryUploadAndExecuteOnMissingInputs() throws Exception {
serviceRegistry.addService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1045,9 +1045,9 @@ public void testDownloadMinimalDirectoryFails() throws Exception {
MetadataInjector injector = mock(MetadataInjector.class);

// act
IOException e =
BulkTransferException e =
assertThrows(
IOException.class,
BulkTransferException.class,
() ->
remoteCache.downloadMinimal(
r,
Expand All @@ -1057,7 +1057,8 @@ public void testDownloadMinimalDirectoryFails() throws Exception {
execRoot,
injector,
outputFilesLocker));
assertThat(e).isEqualTo(downloadTreeException);
assertThat(e.getSuppressed()).hasLength(1);
assertThat(e.getSuppressed()[0]).isEqualTo(downloadTreeException);

verify(outputFilesLocker, never()).lock();
}
Expand Down

0 comments on commit 24f97e1

Please sign in to comment.