Skip to content

Commit

Permalink
Remove RetryException.
Browse files Browse the repository at this point in the history
Retries should be an implementation detail of the cache client implementation. No client is interested in the number of retry attempts, so it seems better to make the retry code transparent with respect to exceptions. The retry code also wrapped arbitrary exceptions with RetryException. It is now the responsibility of the cache implementation code to convert low-level exceptions to IOException or an error meaningful at higher layers like CacheNotFoundException. For example, the gRPC action cache and remote executor now explicitly convert StatusRuntimeException into IOException.

Also, remove PassThroughException, since it's unused.

This should fix #6308.

Closes #7009.

PiperOrigin-RevId: 230728718
  • Loading branch information
benjaminp authored and Copybara-Service committed Jan 24, 2019
1 parent 8125da5 commit 1532df0
Show file tree
Hide file tree
Showing 16 changed files with 203 additions and 337 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,6 @@ public abstract class AbstractRemoteActionCache implements AutoCloseable {
((SettableFuture<byte[]>) EMPTY_BYTES).set(new byte[0]);
}

public static boolean causedByCacheMiss(IOException t) {
return t.getCause() instanceof CacheNotFoundException;
}

protected final RemoteOptions options;
protected final DigestUtil digestUtil;
private final Retrier retrier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.remote.Retrier.RetryException;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import io.grpc.CallCredentials;
import io.grpc.CallOptions;
Expand All @@ -42,6 +41,7 @@
import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.ReferenceCounted;
import java.io.IOException;
Expand Down Expand Up @@ -130,7 +130,6 @@ public ByteStreamUploader(
* @param forceUpload if {@code false} the blob is not uploaded if it has previously been
* uploaded, if {@code true} the blob is uploaded.
* @throws IOException when reading of the {@link Chunker}s input source fails
* @throws RetryException when the upload failed after a retry
*/
public void uploadBlob(Chunker chunker, boolean forceUpload) throws IOException,
InterruptedException {
Expand All @@ -151,12 +150,11 @@ public void uploadBlob(Chunker chunker, boolean forceUpload) throws IOException,
*
* @param chunkers the data to upload.
* @param forceUpload if {@code false} the blob is not uploaded if it has previously been
* uploaded, if {@code true} the blob is uploaded.
* @throws IOException when reading of the {@link Chunker}s input source fails
* @throws RetryException when the upload failed after a retry
* uploaded, if {@code true} the blob is uploaded.
* @throws IOException when reading of the {@link Chunker}s input source or uploading fails
*/
public void uploadBlobs(Iterable<Chunker> chunkers, boolean forceUpload) throws IOException,
InterruptedException {
public void uploadBlobs(Iterable<Chunker> chunkers, boolean forceUpload)
throws IOException, InterruptedException {
List<ListenableFuture<Void>> uploads = new ArrayList<>();

for (Chunker chunker : chunkers) {
Expand All @@ -170,7 +168,11 @@ public void uploadBlobs(Iterable<Chunker> chunkers, boolean forceUpload) throws
} catch (ExecutionException e) {
Throwable cause = e.getCause();
Throwables.propagateIfInstanceOf(cause, IOException.class);
throw new RuntimeException(cause);
Throwables.propagateIfInstanceOf(cause, InterruptedException.class);
if (cause instanceof StatusRuntimeException) {
throw new IOException(cause);
}
Throwables.propagate(cause);
}
}

Expand Down Expand Up @@ -212,7 +214,6 @@ void shutdown() {
* @param forceUpload if {@code false} the blob is not uploaded if it has previously been
* uploaded, if {@code true} the blob is uploaded.
* @throws IOException when reading of the {@link Chunker}s input source fails
* @throws RetryException when the upload failed after a retry
*/
public ListenableFuture<Void> uploadBlobAsync(Chunker chunker, boolean forceUpload) {
Digest digest = checkNotNull(chunker.digest());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.remote.Retrier.RetryException;
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.DigestUtil.ActionKey;
Expand Down Expand Up @@ -323,15 +322,19 @@ public void upload(
if (!uploadAction) {
return;
}
retrier.execute(
() ->
acBlockingStub()
.updateActionResult(
UpdateActionResultRequest.newBuilder()
.setInstanceName(options.remoteInstanceName)
.setActionDigest(actionKey.getDigest())
.setActionResult(result)
.build()));
try {
retrier.execute(
() ->
acBlockingStub()
.updateActionResult(
UpdateActionResultRequest.newBuilder()
.setInstanceName(options.remoteInstanceName)
.setActionDigest(actionKey.getDigest())
.setActionResult(result)
.build()));
} catch (StatusRuntimeException e) {
throw new IOException(e);
}
}

void upload(
Expand Down Expand Up @@ -433,12 +436,12 @@ public ActionResult getCachedActionResult(ActionKey actionKey)
.setInstanceName(options.remoteInstanceName)
.setActionDigest(actionKey.getDigest())
.build()));
} catch (RetryException e) {
if (RemoteRetrierUtils.causedByStatus(e, Status.Code.NOT_FOUND)) {
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
// Return null to indicate that it was a cache miss.
return null;
}
throw e;
throw new IOException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,52 +136,56 @@ public ExecuteResponse executeRemotely(ExecuteRequest request)
new AtomicReference<>(Operation.getDefaultInstance());
final AtomicBoolean waitExecution =
new AtomicBoolean(false); // Whether we should call WaitExecution.
return retrier.execute(
() -> {
final Iterator<Operation> replies;
if (waitExecution.get()) {
WaitExecutionRequest wr =
WaitExecutionRequest.newBuilder().setName(operation.get().getName()).build();
replies = execBlockingStub().waitExecution(wr);
} else {
replies = execBlockingStub().execute(request);
}
try {
while (replies.hasNext()) {
Operation o = replies.next();
operation.set(o);
waitExecution.set(!operation.get().getDone());
ExecuteResponse r = getOperationResponse(o);
if (r != null) {
return r;
}
}
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Code.NOT_FOUND) {
// Operation was lost on the server. Retry Execute.
waitExecution.set(false);
try {
return retrier.execute(
() -> {
final Iterator<Operation> replies;
if (waitExecution.get()) {
WaitExecutionRequest wr =
WaitExecutionRequest.newBuilder().setName(operation.get().getName()).build();
replies = execBlockingStub().waitExecution(wr);
} else {
replies = execBlockingStub().execute(request);
}
throw e;
} finally {
// The blocking streaming call closes correctly only when trailers and a Status
// are received from the server so that onClose() is called on this call's
// CallListener. Under normal circumstances (no cancel/errors), these are
// guaranteed to be sent by the server only if replies.hasNext() has been called
// after all replies from the stream have been consumed.
try {
while (replies.hasNext()) {
replies.next();
Operation o = replies.next();
operation.set(o);
waitExecution.set(!operation.get().getDone());
ExecuteResponse r = getOperationResponse(o);
if (r != null) {
return r;
}
}
} catch (StatusRuntimeException e) {
// Cleanup: ignore exceptions, because the meaningful errors have already been
// propagated.
if (e.getStatus().getCode() == Code.NOT_FOUND) {
// Operation was lost on the server. Retry Execute.
waitExecution.set(false);
}
throw e;
} finally {
// The blocking streaming call closes correctly only when trailers and a Status
// are received from the server so that onClose() is called on this call's
// CallListener. Under normal circumstances (no cancel/errors), these are
// guaranteed to be sent by the server only if replies.hasNext() has been called
// after all replies from the stream have been consumed.
try {
while (replies.hasNext()) {
replies.next();
}
} catch (StatusRuntimeException e) {
// Cleanup: ignore exceptions, because the meaningful errors have already been
// propagated.
}
}
}
throw new IOException(
String.format(
"Remote server error: execution request for %s terminated with no result.",
operation.get().getName()));
});
throw new IOException(
String.format(
"Remote server error: execution request for %s terminated with no result.",
operation.get().getName()));
});
} catch (StatusRuntimeException e) {
throw new IOException(e);
}
}

public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,43 +16,21 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Predicate;
import java.util.function.Supplier;

/**
* Specific retry logic for remote execution/caching.
*
* <p>A call can disable retries by throwing a {@link PassThroughException}. <code>
* RemoteRetrier r = ...;
* try {
* r.execute(() -> {
* // Not retried.
* throw PassThroughException(new IOException("fail"));
* }
* } catch (RetryException e) {
* // e.getCause() is the IOException
* System.out.println(e.getCause());
* }
* </code>
*/
/** Specific retry logic for remote execution/caching. */
public class RemoteRetrier extends Retrier {

/**
* Wraps around an {@link Exception} to make it pass through a single layer of retries.
*/
public static class PassThroughException extends Exception {
public PassThroughException(Exception e) {
super(e);
}
}

public static final Predicate<? super Exception> RETRIABLE_GRPC_ERRORS =
e -> {
if (!(e instanceof StatusException) && !(e instanceof StatusRuntimeException)) {
Expand Down Expand Up @@ -102,7 +80,7 @@ public RemoteRetrier(
Predicate<? super Exception> shouldRetry,
ListeningScheduledExecutorService retryScheduler,
CircuitBreaker circuitBreaker) {
super(backoff, supportPassthrough(shouldRetry), retryScheduler, circuitBreaker);
super(backoff, shouldRetry, retryScheduler, circuitBreaker);
}

@VisibleForTesting
Expand All @@ -112,30 +90,24 @@ public RemoteRetrier(
ListeningScheduledExecutorService retryScheduler,
CircuitBreaker circuitBreaker,
Sleeper sleeper) {
super(backoff, supportPassthrough(shouldRetry), retryScheduler, circuitBreaker, sleeper);
super(backoff, shouldRetry, retryScheduler, circuitBreaker, sleeper);
}

/**
* Execute a callable with retries. {@link IOException} and {@link InterruptedException} are
* propagated directly to the caller. All other exceptions are wrapped in {@link RuntimeError}.
*/
@Override
public <T> T execute(Callable<T> call) throws RetryException, InterruptedException {
public <T> T execute(Callable<T> call) throws IOException, InterruptedException {
try {
return super.execute(call);
} catch (RetryException e) {
if (e.getCause() instanceof PassThroughException) {
PassThroughException passThrough = (PassThroughException) e.getCause();
throw new RetryException("Retries aborted because of PassThroughException",
e.getAttempts(), (Exception) passThrough.getCause());
}
throw e;
} catch (Exception e) {
Throwables.propagateIfInstanceOf(e, IOException.class);
Throwables.propagateIfInstanceOf(e, InterruptedException.class);
throw Throwables.propagate(e);
}
}


private static Predicate<? super Exception> supportPassthrough(
Predicate<? super Exception> delegate) {
// PassThroughException is not retriable.
return e -> !(e instanceof PassThroughException) && delegate.test(e);
}

static class ExponentialBackoff implements Retrier.Backoff {

private final long maxMillis;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

package com.google.devtools.build.lib.remote;

import com.google.devtools.build.lib.remote.Retrier.RetryException;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
Expand All @@ -23,23 +22,22 @@
public final class RemoteRetrierUtils {

public static boolean causedByStatus(Throwable e, Status.Code expected) {
if (e instanceof RetryException) {
e = e.getCause();
}
if (e instanceof StatusRuntimeException) {
return ((StatusRuntimeException) e).getStatus().getCode() == expected;
} else if (e instanceof StatusException) {
return ((StatusException) e).getStatus().getCode() == expected;
} else if (e.getCause() != null) {
return causedByStatus(e.getCause(), expected);
}
return false;
}

public static boolean causedByExecTimeout(Throwable e) {
if (!(e instanceof RetryException)) {
return false;
if (e instanceof ExecutionStatusException) {
return ((ExecutionStatusException) e).isExecutionTimeout();
} else if (e.getCause() != null) {
return causedByExecTimeout(e.getCause());
}
e = e.getCause();
return (e instanceof ExecutionStatusException
&& ((ExecutionStatusException) e).isExecutionTimeout());
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import io.grpc.CallCredentials;
import io.grpc.Context;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -73,6 +74,8 @@ public ServerCapabilities get(String buildRequestId, String commandId)
? GetCapabilitiesRequest.getDefaultInstance()
: GetCapabilitiesRequest.newBuilder().setInstanceName(instanceName).build();
return retrier.execute(() -> capabilitiesBlockingStub().getCapabilities(request));
} catch (StatusRuntimeException e) {
throw new IOException(e);
} finally {
withMetadata.detach(previous);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,15 @@ public CacheHandle lookup(Spawn spawn, SpawnExecutionContext context)
.build();
return SpawnCache.success(spawnResult);
}
} catch (CacheNotFoundException e) {
// Intentionally left blank
} catch (IOException e) {
if (!AbstractRemoteActionCache.causedByCacheMiss(e)) {
String errorMsg = e.getMessage();
if (isNullOrEmpty(errorMsg)) {
String errorMsg = e.getMessage();
if (isNullOrEmpty(errorMsg)) {
errorMsg = e.getClass().getSimpleName();
}
}
errorMsg = "Reading from Remote Cache:\n" + errorMsg;
report(Event.warn(errorMsg));
}
} finally {
withMetadata.detach(previous);
}
Expand Down
Loading

0 comments on commit 1532df0

Please sign in to comment.