Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Suppress last-ditch download exceptions w/cleanup #10029

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2020 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote;

import com.google.common.base.Preconditions;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import java.io.IOException;

/**
* Exception which represents a collection of IOExceptions for the purpose
* of distinguishing remote communication exceptions from those which occur
* on filesystems locally. This exception serves as a trace point for the actual
* transfer, so that the intended operation can be observed in a stack, with all
* constituent exceptions available for observation.
*/
class BulkTransferException extends IOException {
// true since no empty BulkTransferException is ever thrown
private boolean allCacheNotFoundException = true;

BulkTransferException() {
}

BulkTransferException(IOException e) {
add(e);
}

/**
* Add an IOException to the suppressed list.
*
* The Java standard addSuppressed is final and this method stands in
* its place to selectively filter and record whether all suppressed
* exceptions are CacheNotFoundExceptions
*/
void add(IOException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this can ever be called in a multi-threaded context.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It cannot in any of the uses I've added here - all the aggregations happen on a single thread.

allCacheNotFoundException &= e instanceof CacheNotFoundException;
super.addSuppressed(e);
}

boolean onlyCausedByCacheNotFoundException() {
return allCacheNotFoundException;
}
}
143 changes: 67 additions & 76 deletions src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.google.devtools.build.lib.remote;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;

import build.bazel.remote.execution.v2.Action;
import build.bazel.remote.execution.v2.ActionResult;
Expand All @@ -34,6 +35,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
Expand All @@ -57,7 +59,6 @@
import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.Utils;
import com.google.devtools.build.lib.remote.util.Utils.InMemoryOutput;
import com.google.devtools.build.lib.util.io.FileOutErr;
import com.google.devtools.build.lib.util.io.OutErr;
Expand Down Expand Up @@ -116,7 +117,7 @@ public RemoteCache(

public ActionResult downloadActionResult(ActionKey actionKey, boolean inlineOutErr)
throws IOException, InterruptedException {
return Utils.getFromFuture(cacheProtocol.downloadActionResult(actionKey, inlineOutErr));
return getFromFuture(cacheProtocol.downloadActionResult(actionKey, inlineOutErr));
}

/**
Expand Down Expand Up @@ -182,7 +183,7 @@ private void uploadOutputs(
digests.addAll(digestToBlobs.keySet());

ImmutableSet<Digest> digestsToUpload =
Utils.getFromFuture(cacheProtocol.findMissingDigests(digests));
getFromFuture(cacheProtocol.findMissingDigests(digests));
ImmutableList.Builder<ListenableFuture<Void>> uploads = ImmutableList.builder();
for (Digest digest : digestsToUpload) {
Path file = digestToFile.get(digest);
Expand All @@ -198,7 +199,7 @@ private void uploadOutputs(
}
}

waitForUploads(uploads.build());
waitForBulkTransfer(uploads.build(), /* cancelRemainingOnInterrupt=*/ false);

if (manifest.getStderrDigest() != null) {
result.setStderrDigest(manifest.getStderrDigest());
Expand All @@ -208,22 +209,44 @@ private void uploadOutputs(
}
}

private static void waitForUploads(List<ListenableFuture<Void>> uploads)
throws IOException, InterruptedException {
try {
for (ListenableFuture<Void> upload : uploads) {
upload.get();
protected static <T> void waitForBulkTransfer(Iterable<ListenableFuture<T>> transfers, boolean cancelRemainingOnInterrupt)
throws BulkTransferException, InterruptedException {
BulkTransferException bulkTransferException = null;
InterruptedException interruptedException = null;
boolean interrupted = Thread.currentThread().isInterrupted();
for (ListenableFuture<T> transfer : transfers) {
try {
if (interruptedException == null) {
// Wait for all downloads to finish.
getFromFuture(transfer);
} else {
transfer.cancel(true);
}
} catch (IOException e) {
if (bulkTransferException == null) {
bulkTransferException = new BulkTransferException();
}
bulkTransferException.add(e);
} catch (InterruptedException e) {
interrupted = Thread.interrupted() || interrupted;
interruptedException = e;
if (!cancelRemainingOnInterrupt) {
// leave the rest of the transfers alone
break;
}
}
} catch (ExecutionException e) {
// TODO(buchgr): Add support for cancellation and factor this method out to be shared
// between ByteStreamUploader as well.
Throwable cause = e.getCause();
Throwables.throwIfInstanceOf(cause, IOException.class);
Throwables.throwIfInstanceOf(cause, InterruptedException.class);
if (cause != null) {
throw new IOException(cause);
}
if (interrupted) {
Thread.currentThread().interrupt();
}
if (interruptedException != null) {
if (bulkTransferException != null) {
interruptedException.addSuppressed(bulkTransferException);
}
throw new IOException(e);
throw interruptedException;
}
if (bulkTransferException != null) {
throw bulkTransferException;
}
}

Expand Down Expand Up @@ -299,40 +322,16 @@ public void download(
// Subsequently we need to wait for *every* download to finish, even if we already know that
// one failed. That's so that when exiting this method we can be sure that all downloads have
// finished and don't race with the cleanup routine.
// TODO(buchgr): Look into cancellation.

IOException downloadException = null;
InterruptedException interruptedException = null;
FileOutErr tmpOutErr = null;
try {
if (origOutErr != null) {
tmpOutErr = origOutErr.childOutErr();
}
downloads.addAll(downloadOutErr(result, tmpOutErr));
} catch (IOException e) {
downloadException = e;
}

for (ListenableFuture<FileMetadata> download : downloads) {
try {
// Wait for all downloads to finish.
getFromFuture(download);
} catch (IOException e) {
if (downloadException == null) {
downloadException = e;
} else if (e != downloadException) {
downloadException.addSuppressed(e);
}
} catch (InterruptedException e) {
if (interruptedException == null) {
interruptedException = e;
} else if (e != interruptedException) {
interruptedException.addSuppressed(e);
}
}
if (origOutErr != null) {
tmpOutErr = origOutErr.childOutErr();
}
downloads.addAll(downloadOutErr(result, tmpOutErr));

if (downloadException != null || interruptedException != null) {
try {
waitForBulkTransfer(downloads, /* cancelRemainingOnInterrupt=*/ true);
} catch (Exception e) {
try {
// Delete any (partially) downloaded output files.
for (OutputFile file : result.getOutputFilesList()) {
Expand All @@ -347,27 +346,17 @@ public void download(
tmpOutErr.clearOut();
tmpOutErr.clearErr();
}
} catch (IOException e) {
if (downloadException != null && e != downloadException) {
e.addSuppressed(downloadException);
}
if (interruptedException != null) {
e.addSuppressed(interruptedException);
}
} catch (IOException ioEx) {
ioEx.addSuppressed(e);

// If deleting of output files failed, we abort the build with a decent error message as
// any subsequent local execution failure would likely be incomprehensible.
throw new EnvironmentalExecException(
"Failed to delete output files after incomplete download", e);
ExecException execEx = new EnvironmentalExecException(
"Failed to delete output files after incomplete download", ioEx);
execEx.addSuppressed(e);
throw execEx;
}
}

if (interruptedException != null) {
throw interruptedException;
}

if (downloadException != null) {
throw downloadException;
throw e;
}

if (tmpOutErr != null) {
Expand Down Expand Up @@ -487,12 +476,15 @@ public void onFailure(Throwable t) {
return outerF;
}

private List<ListenableFuture<FileMetadata>> downloadOutErr(ActionResult result, OutErr outErr)
throws IOException {
private List<ListenableFuture<FileMetadata>> downloadOutErr(ActionResult result, OutErr outErr) {
List<ListenableFuture<FileMetadata>> downloads = new ArrayList<>();
if (!result.getStdoutRaw().isEmpty()) {
result.getStdoutRaw().writeTo(outErr.getOutputStream());
outErr.getOutputStream().flush();
try {
result.getStdoutRaw().writeTo(outErr.getOutputStream());
outErr.getOutputStream().flush();
} catch (IOException e) {
downloads.add(Futures.immediateFailedFuture(e));
}
} else if (result.hasStdoutDigest()) {
downloads.add(
Futures.transform(
Expand All @@ -501,8 +493,12 @@ private List<ListenableFuture<FileMetadata>> downloadOutErr(ActionResult result,
directExecutor()));
}
if (!result.getStderrRaw().isEmpty()) {
result.getStderrRaw().writeTo(outErr.getErrorStream());
outErr.getErrorStream().flush();
try {
result.getStderrRaw().writeTo(outErr.getErrorStream());
outErr.getErrorStream().flush();
} catch (IOException e) {
downloads.add(Futures.immediateFailedFuture(e));
}
} else if (result.hasStderrDigest()) {
downloads.add(
Futures.transform(
Expand Down Expand Up @@ -1115,9 +1111,4 @@ public Collection<SymlinkMetadata> symlinks() {
return symlinks.values();
}
}

@VisibleForTesting
protected <T> T getFromFuture(ListenableFuture<T> f) throws IOException, InterruptedException {
return Utils.getFromFuture(f);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;

import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
import static java.lang.String.format;

import build.bazel.remote.execution.v2.Digest;
Expand Down Expand Up @@ -57,21 +58,7 @@ private void uploadMissing(Map<Digest, Path> files, Map<Digest, ByteString> blob
uploads.add(cacheProtocol.uploadBlob(entry.getKey(), entry.getValue()));
}

try {
for (ListenableFuture<Void> upload : uploads) {
upload.get();
}
} catch (ExecutionException e) {
// Cancel remaining uploads.
for (ListenableFuture<Void> upload : uploads) {
upload.cancel(/* mayInterruptIfRunning= */ true);
}

Throwable cause = e.getCause();
Throwables.propagateIfPossible(cause, IOException.class);
Throwables.propagateIfPossible(cause, InterruptedException.class);
throw new IOException(cause);
}
waitForBulkTransfer(uploads, /* cancelRemainingOnInterrupt=*/ false);
}

/**
Expand All @@ -91,7 +78,7 @@ public void ensureInputsPresent(MerkleTree merkleTree, Map<Digest, Message> addi
Iterable<Digest> allDigests =
Iterables.concat(merkleTree.getAllDigests(), additionalInputs.keySet());
ImmutableSet<Digest> missingDigests =
Utils.getFromFuture(cacheProtocol.findMissingDigests(allDigests));
getFromFuture(cacheProtocol.findMissingDigests(allDigests));
Map<Digest, Path> filesToUpload = new HashMap<>();
Map<Digest, ByteString> blobsToUpload = new HashMap<>();
for (Digest missingDigest : missingDigests) {
Expand Down
Loading