diff --git a/xds/src/main/java/io/grpc/xds/FaultFilter.java b/xds/src/main/java/io/grpc/xds/FaultFilter.java index d46b3d30f5a..b7f7fa9c226 100644 --- a/xds/src/main/java/io/grpc/xds/FaultFilter.java +++ b/xds/src/main/java/io/grpc/xds/FaultFilter.java @@ -190,94 +190,102 @@ public ClientInterceptor buildClientInterceptor( config = overrideConfig; } FaultConfig faultConfig = (FaultConfig) config; - Long delayNanos = null; - Status abortStatus = null; - if (faultConfig.maxActiveFaults() == null - || activeFaultCounter.get() < faultConfig.maxActiveFaults()) { - Metadata headers = args.getHeaders(); - if (faultConfig.faultDelay() != null) { - delayNanos = determineFaultDelayNanos(faultConfig.faultDelay(), headers); - } - if (faultConfig.faultAbort() != null) { - abortStatus = determineFaultAbortStatus(faultConfig.faultAbort(), headers); - } - } - if (delayNanos == null && abortStatus == null) { - return null; - } - final Long finalDelayNanos = delayNanos; - final Status finalAbortStatus = getAbortStatusWithDescription(abortStatus); final class FaultInjectionInterceptor implements ClientInterceptor { @Override public ClientCall interceptCall( final MethodDescriptor method, final CallOptions callOptions, final Channel next) { - Executor callExecutor = callOptions.getExecutor(); - if (callExecutor == null) { // This should never happen in practice because - // ManagedChannelImpl.ConfigSelectingClientCall always provides CallOptions with - // a callExecutor. - // TODO(https://github.com/grpc/grpc-java/issues/7868) - callExecutor = MoreExecutors.directExecutor(); + boolean checkFault = false; + if (faultConfig.maxActiveFaults() == null + || activeFaultCounter.get() < faultConfig.maxActiveFaults()) { + checkFault = faultConfig.faultDelay() != null || faultConfig.faultAbort() != null; } - if (finalDelayNanos != null) { - Supplier> callSupplier; - if (finalAbortStatus != null) { - callSupplier = Suppliers.ofInstance( - new FailingClientCall(finalAbortStatus, callExecutor)); - } else { - callSupplier = new Supplier>() { - @Override - public ClientCall get() { - return next.newCall(method, callOptions); - } - }; + if (!checkFault) { + return next.newCall(method, callOptions); + } + final class DeadlineInsightForwardingCall extends ForwardingClientCall { + private ClientCall delegate; + + @Override + protected ClientCall delegate() { + return delegate; } - final DelayInjectedCall delayInjectedCall = new DelayInjectedCall<>( - finalDelayNanos, callExecutor, scheduler, callOptions.getDeadline(), callSupplier); - final class DeadlineInsightForwardingCall extends ForwardingClientCall { - @Override - protected ClientCall delegate() { - return delayInjectedCall; + @Override + public void start(Listener listener, Metadata headers) { + Executor callExecutor = callOptions.getExecutor(); + if (callExecutor == null) { // This should never happen in practice because + // ManagedChannelImpl.ConfigSelectingClientCall always provides CallOptions with + // a callExecutor. + // TODO(https://github.com/grpc/grpc-java/issues/7868) + callExecutor = MoreExecutors.directExecutor(); } - @Override - public void start(Listener listener, Metadata headers) { - Listener finalListener = - new SimpleForwardingClientCallListener(listener) { - @Override - public void onClose(Status status, Metadata trailers) { - if (status.getCode().equals(Code.DEADLINE_EXCEEDED)) { - // TODO(zdapeng:) check effective deadline locally, and - // do the following only if the local deadline is exceeded. - // (If the server sends DEADLINE_EXCEEDED for its own deadline, then the - // injected delay does not contribute to the error, because the request is - // only sent out after the delay. There could be a race between local and - // remote, but it is rather rare.) - String description = String.format( - Locale.US, - "Deadline exceeded after up to %d ns of fault-injected delay", - finalDelayNanos); - if (status.getDescription() != null) { - description = description + ": " + status.getDescription(); - } - status = Status.DEADLINE_EXCEEDED - .withDescription(description).withCause(status.getCause()); - // Replace trailers to prevent mixing sources of status and trailers. - trailers = new Metadata(); + Long delayNanos; + Status abortStatus = null; + if (faultConfig.faultDelay() != null) { + delayNanos = determineFaultDelayNanos(faultConfig.faultDelay(), headers); + } else { + delayNanos = null; + } + if (faultConfig.faultAbort() != null) { + abortStatus = getAbortStatusWithDescription( + determineFaultAbortStatus(faultConfig.faultAbort(), headers)); + } + + Supplier> callSupplier; + if (abortStatus != null) { + callSupplier = Suppliers.ofInstance( + new FailingClientCall(abortStatus, callExecutor)); + } else { + callSupplier = new Supplier>() { + @Override + public ClientCall get() { + return next.newCall(method, callOptions); + } + }; + } + if (delayNanos == null) { + delegate = callSupplier.get(); + delegate().start(listener, headers); + return; + } + + delegate = new DelayInjectedCall<>( + delayNanos, callExecutor, scheduler, callOptions.getDeadline(), callSupplier); + + Listener finalListener = + new SimpleForwardingClientCallListener(listener) { + @Override + public void onClose(Status status, Metadata trailers) { + if (status.getCode().equals(Code.DEADLINE_EXCEEDED)) { + // TODO(zdapeng:) check effective deadline locally, and + // do the following only if the local deadline is exceeded. + // (If the server sends DEADLINE_EXCEEDED for its own deadline, then the + // injected delay does not contribute to the error, because the request is + // only sent out after the delay. There could be a race between local and + // remote, but it is rather rare.) + String description = String.format( + Locale.US, + "Deadline exceeded after up to %d ns of fault-injected delay", + delayNanos); + if (status.getDescription() != null) { + description = description + ": " + status.getDescription(); } - delegate().onClose(status, trailers); + status = Status.DEADLINE_EXCEEDED + .withDescription(description).withCause(status.getCause()); + // Replace trailers to prevent mixing sources of status and trailers. + trailers = new Metadata(); } - }; - delegate().start(finalListener, headers); - } + delegate().onClose(status, trailers); + } + }; + delegate().start(finalListener, headers); } - - return new DeadlineInsightForwardingCall(); - } else { - return new FailingClientCall<>(finalAbortStatus, callExecutor); } + + return new DeadlineInsightForwardingCall(); } }