From 6f737d4d538b8a7d7f5cf1e203b2154ebfa46f9f Mon Sep 17 00:00:00 2001 From: George Gensure Date: Thu, 6 Jul 2023 11:01:42 -0700 Subject: [PATCH] Avoid discarding SRE state for IO cause Unwrapping all StatusRuntimeExceptions in in ReferenceCountedChannel when caused by IOException will discard critical tracing and retriability. The Retrier evaluations may not see an SRE in the causal chain, and presume it is invariably an unretriable exception. In general, IOExceptions as SRE wrappers are unsuitable containers and are routinely misued either for identification (grpc aware status), or capture (handleInitError). Partially addresses #18764 (retries will occur with SSL handshake timeout, but the actual connection will not be retried) Closes #18836. PiperOrigin-RevId: 546037698 Change-Id: I7f6efcb857c557aa97ad3df085fc032c8538eb9a --- .../lib/remote/ReferenceCountedChannel.java | 49 ++++++++++++++++--- .../build/lib/remote/RemoteModule.java | 21 ++++---- .../lib/remote/RemoteServerCapabilities.java | 27 +++++----- 3 files changed, 64 insertions(+), 33 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannel.java b/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannel.java index 8e4e95afd56810..bc2f6ec59940d9 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannel.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannel.java @@ -13,10 +13,11 @@ // limitations under the License. package com.google.devtools.build.lib.remote; -import static com.google.common.base.Throwables.throwIfInstanceOf; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import com.google.common.base.Throwables; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory; import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory.ChannelConnection; import com.google.devtools.build.lib.remote.grpc.DynamicConnectionPool; @@ -28,9 +29,12 @@ import io.netty.util.ReferenceCounted; import io.reactivex.rxjava3.annotations.CheckReturnValue; import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.core.SingleObserver; import io.reactivex.rxjava3.core.SingleSource; +import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.functions.Function; import java.io.IOException; +import java.util.concurrent.ExecutionException; /** * A wrapper around a {@link DynamicConnectionPool} exposing {@link Channel} and a reference count. @@ -80,19 +84,48 @@ public ListenableFuture withChannelFuture( } public T withChannelBlocking(Function source) - throws IOException, InterruptedException { + throws ExecutionException, IOException, InterruptedException { try { - return withChannel(channel -> Single.just(source.apply(channel))).blockingGet(); - } catch (RuntimeException e) { + return withChannelBlockingGet(source); + } catch (ExecutionException e) { Throwable cause = e.getCause(); - if (cause != null) { - throwIfInstanceOf(cause, IOException.class); - throwIfInstanceOf(cause, InterruptedException.class); - } + Throwables.throwIfInstanceOf(cause, IOException.class); + Throwables.throwIfUnchecked(cause); throw e; } } + // prevents rxjava silent possible wrap of RuntimeException and misinterpretation + private T withChannelBlockingGet(Function source) + throws ExecutionException, InterruptedException { + SettableFuture future = SettableFuture.create(); + withChannel(channel -> Single.just(source.apply(channel))) + .subscribe( + new SingleObserver() { + @Override + public void onError(Throwable t) { + future.setException(t); + } + + @Override + public void onSuccess(T t) { + future.set(t); + } + + @Override + public void onSubscribe(Disposable d) { + future.addListener( + () -> { + if (future.isCancelled()) { + d.dispose(); + } + }, + directExecutor()); + } + }); + return future.get(); + } + @CheckReturnValue public Single withChannel(Function> source) { return dynamicConnectionPool diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java index 2c3930733d33a5..b3592a7222303e 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java @@ -191,6 +191,7 @@ private static boolean shouldEnableRemoteDownloader(RemoteOptions options) { return !Strings.isNullOrEmpty(options.remoteDownloader); } + @Nullable private static ServerCapabilities getAndVerifyServerCapabilities( RemoteOptions remoteOptions, ReferenceCountedChannel channel, @@ -578,7 +579,9 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { digestUtil, ServerCapabilitiesRequirement.CACHE); } - } catch (IOException e) { + } catch (AbruptExitException e) { + throw e; // prevent abrupt interception + } catch (Exception e) { String errorMessage = "Failed to query remote execution capabilities: " + Utils.grpcAwareErrorMessage(e, verboseFailures); @@ -603,12 +606,12 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { if (Strings.isNullOrEmpty(remoteBytestreamUriPrefix)) { try { remoteBytestreamUriPrefix = cacheChannel.withChannelBlocking(Channel::authority); - } catch (IOException e) { + } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } handleInitFailure(env, e, Code.CACHE_INIT_FAILURE); return; - } catch (InterruptedException e) { - handleInitFailure(env, new IOException(e), Code.CACHE_INIT_FAILURE); - return; } if (!Strings.isNullOrEmpty(remoteOptions.remoteInstanceName)) { remoteBytestreamUriPrefix += "/" + remoteOptions.remoteInstanceName; @@ -630,7 +633,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { remoteOptions.remoteVerifyDownloads, digestUtil, cacheClient); - } catch (IOException e) { + } catch (Exception e) { handleInitFailure(env, e, Code.CACHE_INIT_FAILURE); return; } @@ -695,7 +698,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { remoteOptions.remoteVerifyDownloads, digestUtil, cacheClient); - } catch (IOException e) { + } catch (Exception e) { handleInitFailure(env, e, Code.CACHE_INIT_FAILURE); return; } @@ -741,7 +744,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { } private static void handleInitFailure( - CommandEnvironment env, IOException e, Code remoteExecutionCode) { + CommandEnvironment env, Exception e, Code remoteExecutionCode) { env.getReporter().handle(Event.error(e.getMessage())); env.getBlazeModuleEnvironment() .exit( @@ -884,7 +887,7 @@ private static void checkClientServerCompatibility( } @Override - public void afterCommand() throws AbruptExitException { + public void afterCommand() { Preconditions.checkNotNull(blockWaitingModule, "blockWaitingModule must not be null"); // Some cleanup tasks must wait until every other BlazeModule's afterCommand() has run, as diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteServerCapabilities.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteServerCapabilities.java index 65624b09392c6e..8dc45b8e99f89b 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteServerCapabilities.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteServerCapabilities.java @@ -31,7 +31,6 @@ import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; import io.grpc.CallCredentials; import io.grpc.Channel; -import io.grpc.StatusRuntimeException; import java.io.IOException; import java.util.List; import java.util.concurrent.TimeUnit; @@ -73,21 +72,17 @@ public ServerCapabilities get(String buildRequestId, String commandId) RequestMetadata metadata = TracingMetadataUtils.buildMetadata(buildRequestId, commandId, "capabilities", null); RemoteActionExecutionContext context = RemoteActionExecutionContext.create(metadata); - try { - GetCapabilitiesRequest request = - instanceName == null - ? GetCapabilitiesRequest.getDefaultInstance() - : GetCapabilitiesRequest.newBuilder().setInstanceName(instanceName).build(); - return retrier.execute( - () -> - channel.withChannelBlocking( - channel -> capabilitiesBlockingStub(context, channel).getCapabilities(request))); - } catch (StatusRuntimeException e) { - if (e.getCause() instanceof IOException) { - throw (IOException) e.getCause(); - } - throw new IOException(e); - } + GetCapabilitiesRequest request = + instanceName == null + ? GetCapabilitiesRequest.getDefaultInstance() + : GetCapabilitiesRequest.newBuilder().setInstanceName(instanceName).build(); + ServerCapabilities caps = + retrier.execute( + () -> + channel.withChannelBlocking( + channel -> + capabilitiesBlockingStub(context, channel).getCapabilities(request))); + return caps; } static class ClientServerCompatibilityStatus {