Skip to content

Commit

Permalink
Release 6.4.0 remote (#18959)
Browse files Browse the repository at this point in the history
* Include stack trace in all gRPC errors when --verbose_failures is set.

Also refactor a couple places where the stack trace was included in an ad-hoc
manner, and force Utils.grpcAwareErrorMessage callers to be explicit to avoid
future instances.

Closes #16086.

PiperOrigin-RevId: 502854490
Change-Id: Id2d6a1728630fffea9399b406378c7f173b247bd

* Avoid discarding SRE state for IO cause

Unwrapping all StatusRuntimeExceptions in in ReferenceCountedChannel when caused by IOException will discard critical tracing and retriability. The Retrier evaluations may not see an SRE in the causal chain, and presume it is invariably an unretriable exception. In general, IOExceptions as SRE wrappers are unsuitable containers and are routinely misued either for identification (grpc aware status), or capture (handleInitError).

Partially addresses #18764 (retries will occur with SSL handshake timeout, but the actual connection will not be retried)

Closes #18836.

PiperOrigin-RevId: 546037698
Change-Id: I7f6efcb857c557aa97ad3df085fc032c8538eb9a

* Operation stream termination is not an error

According to the GrpcRemoteExecutor when it occurs after a !done operation response. Remove the error from the
ExperimentalRemoteGrpcExecutor and reinforce both with tests.

Update the FakeExecutionService to generate compatible error responses that appear in the ExecuteResponse, rather than the operation error field, per the REAPI spec. Made required adjustments to ExGRE Test invocations to avoid the ExecutionStatusException interpretation of DEADLINE_EXCEEDED -> FAILED_PRECONDITION in ExecuteResponse.

Closes #18785.

PiperOrigin-RevId: 546925894
Change-Id: I7a489c8bc936a83cfd94e0138437f3fe6d152da8

* Done operations must be reexecuted

Any operation with done == true as reported by the server is not expected to change its result on subsequent waitExecution calls. To properly retry, this action must be reexecuted, if it was truly transient, to achieve a definitive result. Submit a transient status for retry, disallow special behaviors for NOT_FOUND as covered by done observation, and consider method type when handling operation streams.

Closes #18943.

PiperOrigin-RevId: 548680656
Change-Id: Ib2c9887ead1fbd3de97761db6e8b4077783ad03c

---------

Co-authored-by: Tiago Quelhas <tjgq@google.com>
  • Loading branch information
werkt and tjgq authored Jul 25, 2023
1 parent e9a0383 commit bade2e9
Show file tree
Hide file tree
Showing 12 changed files with 601 additions and 362 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,12 @@ ExecuteResponse start() throws IOException, InterruptedException {
// retrying when received a unauthenticated error, and propagate to refreshIfUnauthenticated
// which will then call retrier again. It will reset the retry time counter so we could
// retry more than --remote_retry times which is not expected.
response =
retrier.execute(
() -> Utils.refreshIfUnauthenticated(this::execute, callCredentialsProvider),
executeBackoff);
if (lastOperation == null) {
response =
retrier.execute(
() -> Utils.refreshIfUnauthenticated(this::execute, callCredentialsProvider),
executeBackoff);
}

// If no response from Execute(), use WaitExecution() in a "loop" which is implemented
// inside the retry block.
Expand Down Expand Up @@ -177,7 +179,7 @@ ExecuteResponse execute() throws IOException {

try {
Iterator<Operation> operationStream = executeFunction.apply(request);
return handleOperationStream(operationStream);
return handleOperationStream(operationStream, /* waitExecution= */ false);
} catch (Throwable e) {
// If lastOperation is not null, we know the execution request is accepted by the server. In
// this case, we will fallback to WaitExecution() loop when the stream is broken.
Expand All @@ -197,33 +199,43 @@ ExecuteResponse waitExecution() throws IOException {
WaitExecutionRequest.newBuilder().setName(lastOperation.getName()).build();
try {
Iterator<Operation> operationStream = waitExecutionFunction.apply(request);
return handleOperationStream(operationStream);
return handleOperationStream(operationStream, /* waitExecution= */ true);
} catch (StatusRuntimeException e) {
throw new IOException(e);
} catch (Throwable e) {
// A NOT_FOUND error means Operation was lost on the server, retry Execute().
//
// However, we only retry Execute() if executeBackoff should retry. Also increase the retry
// counter at the same time (done by nextDelayMillis()).
if (e instanceof StatusRuntimeException) {
StatusRuntimeException sre = (StatusRuntimeException) e;
if (sre.getStatus().getCode() == Code.NOT_FOUND
&& executeBackoff.nextDelayMillis(sre) >= 0) {
lastOperation = null;
return null;
}
}
lastOperation = null;
throw new IOException(e);
}
}

/** Process a stream of operations from Execute() or WaitExecution(). */
ExecuteResponse handleOperationStream(Iterator<Operation> operationStream) throws IOException {
@Nullable
ExecuteResponse handleOperationStream(
Iterator<Operation> operationStream, boolean waitExecution) throws IOException {
try {
while (operationStream.hasNext()) {
Operation operation = operationStream.next();
ExecuteResponse response = extractResponseOrThrowIfError(operation);

// At this point, we successfully received a response that is not an error.
lastOperation = operation;
// Either done or should be repeated
lastOperation = operation.getDone() ? null : operation;

ExecuteResponse response;
try {
response = extractResponseOrThrowIfError(operation);
} catch (StatusRuntimeException e) {
// An operation error means Operation has been terminally completed, retry Execute().
//
// However, we only retry Execute() if executeBackoff should retry. Also increase the
// retry
// counter at the same time (done by nextDelayMillis()).
if (waitExecution
&& (retrier.isRetriable(e) || e.getStatus().getCode() == Code.NOT_FOUND)
&& executeBackoff.nextDelayMillis(e) >= 0) {
lastOperation = null;
return null;
}
throw e;
}

// We don't want to reset executeBackoff since if there is an error:
// 1. If happened before we received a first response, we want to ensure the retry
Expand Down Expand Up @@ -258,8 +270,8 @@ ExecuteResponse handleOperationStream(Iterator<Operation> operationStream) throw
}
}

// The operation completed successfully but without a result.
throw new IOException("Remote server error: execution terminated with no result");
// The operation stream completed successfully but without a result.
return null;
} finally {
close(operationStream);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;

import static com.google.common.base.Throwables.throwIfInstanceOf;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;

import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory;
import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory.ChannelConnection;
import com.google.devtools.build.lib.remote.grpc.DynamicConnectionPool;
Expand All @@ -28,9 +29,12 @@
import io.netty.util.ReferenceCounted;
import io.reactivex.rxjava3.annotations.CheckReturnValue;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.SingleObserver;
import io.reactivex.rxjava3.core.SingleSource;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Function;
import java.io.IOException;
import java.util.concurrent.ExecutionException;

/**
* A wrapper around a {@link DynamicConnectionPool} exposing {@link Channel} and a reference count.
Expand Down Expand Up @@ -80,19 +84,48 @@ public <T> ListenableFuture<T> withChannelFuture(
}

public <T> T withChannelBlocking(Function<Channel, T> source)
throws IOException, InterruptedException {
throws ExecutionException, IOException, InterruptedException {
try {
return withChannel(channel -> Single.just(source.apply(channel))).blockingGet();
} catch (RuntimeException e) {
return withChannelBlockingGet(source);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause != null) {
throwIfInstanceOf(cause, IOException.class);
throwIfInstanceOf(cause, InterruptedException.class);
}
Throwables.throwIfInstanceOf(cause, IOException.class);
Throwables.throwIfUnchecked(cause);
throw e;
}
}

// prevents rxjava silent possible wrap of RuntimeException and misinterpretation
private <T> T withChannelBlockingGet(Function<Channel, T> source)
throws ExecutionException, InterruptedException {
SettableFuture<T> future = SettableFuture.create();
withChannel(channel -> Single.just(source.apply(channel)))
.subscribe(
new SingleObserver<T>() {
@Override
public void onError(Throwable t) {
future.setException(t);
}

@Override
public void onSuccess(T t) {
future.set(t);
}

@Override
public void onSubscribe(Disposable d) {
future.addListener(
() -> {
if (future.isCancelled()) {
d.dispose();
}
},
directExecutor());
}
});
return future.get();
}

@CheckReturnValue
public <T> Single<T> withChannel(Function<Channel, ? extends SingleSource<? extends T>> source) {
return dynamicConnectionPool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ private static boolean shouldEnableRemoteDownloader(RemoteOptions options) {
return !Strings.isNullOrEmpty(options.remoteDownloader);
}

@Nullable
private static ServerCapabilities getAndVerifyServerCapabilities(
RemoteOptions remoteOptions,
ReferenceCountedChannel channel,
Expand Down Expand Up @@ -535,9 +536,12 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
digestUtil,
ServerCapabilitiesRequirement.CACHE);
}
} catch (IOException e) {
} catch (AbruptExitException e) {
throw e; // prevent abrupt interception
} catch (Exception e) {
String errorMessage =
"Failed to query remote execution capabilities: " + Utils.grpcAwareErrorMessage(e);
"Failed to query remote execution capabilities: "
+ Utils.grpcAwareErrorMessage(e, verboseFailures);
if (remoteOptions.remoteLocalFallback) {
if (verboseFailures) {
errorMessage += System.lineSeparator() + Throwables.getStackTraceAsString(e);
Expand All @@ -559,12 +563,12 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
if (Strings.isNullOrEmpty(remoteBytestreamUriPrefix)) {
try {
remoteBytestreamUriPrefix = cacheChannel.withChannelBlocking(Channel::authority);
} catch (IOException e) {
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
handleInitFailure(env, e, Code.CACHE_INIT_FAILURE);
return;
} catch (InterruptedException e) {
handleInitFailure(env, new IOException(e), Code.CACHE_INIT_FAILURE);
return;
}
if (!Strings.isNullOrEmpty(remoteOptions.remoteInstanceName)) {
remoteBytestreamUriPrefix += "/" + remoteOptions.remoteInstanceName;
Expand All @@ -587,7 +591,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
!remoteOptions.remoteOutputsMode.downloadAllOutputs(),
digestUtil,
cacheClient);
} catch (IOException e) {
} catch (Exception e) {
handleInitFailure(env, e, Code.CACHE_INIT_FAILURE);
return;
}
Expand Down Expand Up @@ -652,7 +656,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
!remoteOptions.remoteOutputsMode.downloadAllOutputs(),
digestUtil,
cacheClient);
} catch (IOException e) {
} catch (Exception e) {
handleInitFailure(env, e, Code.CACHE_INIT_FAILURE);
return;
}
Expand Down Expand Up @@ -698,7 +702,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
}

private static void handleInitFailure(
CommandEnvironment env, IOException e, Code remoteExecutionCode) {
CommandEnvironment env, Exception e, Code remoteExecutionCode) {
env.getReporter().handle(Event.error(e.getMessage()));
env.getBlazeModuleEnvironment()
.exit(
Expand Down Expand Up @@ -793,7 +797,7 @@ private static void checkClientServerCompatibility(
}

@Override
public void afterCommand() throws AbruptExitException {
public void afterCommand() {
Preconditions.checkNotNull(blockWaitingModule, "blockWaitingModule must not be null");

// Some cleanup tasks must wait until every other BlazeModule's afterCommand() has run, as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import io.grpc.CallCredentials;
import io.grpc.Channel;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -73,21 +72,17 @@ public ServerCapabilities get(String buildRequestId, String commandId)
RequestMetadata metadata =
TracingMetadataUtils.buildMetadata(buildRequestId, commandId, "capabilities", null);
RemoteActionExecutionContext context = RemoteActionExecutionContext.create(metadata);
try {
GetCapabilitiesRequest request =
instanceName == null
? GetCapabilitiesRequest.getDefaultInstance()
: GetCapabilitiesRequest.newBuilder().setInstanceName(instanceName).build();
return retrier.execute(
() ->
channel.withChannelBlocking(
channel -> capabilitiesBlockingStub(context, channel).getCapabilities(request)));
} catch (StatusRuntimeException e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
throw new IOException(e);
}
GetCapabilitiesRequest request =
instanceName == null
? GetCapabilitiesRequest.getDefaultInstance()
: GetCapabilitiesRequest.newBuilder().setInstanceName(instanceName).build();
ServerCapabilities caps =
retrier.execute(
() ->
channel.withChannelBlocking(
channel ->
capabilitiesBlockingStub(context, channel).getCapabilities(request)));
return caps;
}

static class ClientServerCompatibilityStatus {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.actions.FileArtifactValue;
Expand Down Expand Up @@ -141,13 +140,7 @@ public CacheHandle lookup(Spawn spawn, SpawnExecutionContext context)
if (BulkTransferException.allCausedByCacheNotFoundException(e)) {
// Intentionally left blank
} else {
String errorMessage;
if (!verboseFailures) {
errorMessage = Utils.grpcAwareErrorMessage(e);
} else {
// On --verbose_failures print the whole stack trace
errorMessage = "\n" + Throwables.getStackTraceAsString(e);
}
String errorMessage = Utils.grpcAwareErrorMessage(e, verboseFailures);
if (isNullOrEmpty(errorMessage)) {
errorMessage = e.getClass().getSimpleName();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.CommandLines.ParamFileActionInput;
Expand Down Expand Up @@ -573,11 +572,7 @@ private SpawnResult handleError(
catastrophe = false;
}

String errorMessage = Utils.grpcAwareErrorMessage(exception);
if (verboseFailures) {
// On --verbose_failures print the whole stack trace
errorMessage += "\n" + Throwables.getStackTraceAsString(exception);
}
String errorMessage = Utils.grpcAwareErrorMessage(exception, verboseFailures);

if (exception.getCause() instanceof ExecutionStatusException) {
ExecutionStatusException e = (ExecutionStatusException) exception.getCause();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ private static String executionStatusExceptionErrorMessage(ExecutionStatusExcept
+ errorDetailsMessage(status.getDetailsList());
}

public static String grpcAwareErrorMessage(IOException e) {
private static String grpcAwareErrorMessage(IOException e) {
io.grpc.Status errStatus = io.grpc.Status.fromThrowable(e);
if (e.getCause() instanceof ExecutionStatusException) {
// Display error message returned by the remote service.
Expand Down
Loading

0 comments on commit bade2e9

Please sign in to comment.