Skip to content

Commit

Permalink
xds: Allow FaultFilter's interceptor to be reused
Browse files Browse the repository at this point in the history
This is the only usage of PickSubchannelArgs when creating a filter's
ClientInterceptor, and a follow-up commit will remove the argument and
actually reuse the interceptors. Other filter's interceptors can
already be reused.

There doesn't seem to be any significant loss of legibility by making
FaultFilter a more ordinary interceptor, but the change does cause the
ForwardingClientCall to be present when faultDelay is configured,
independent of whether the fault delay ends up being triggered.

Reusing interceptors will move more state management out of the RPC path
which will be more relevant with RLQS.
  • Loading branch information
ejona86 committed Jan 29, 2025
1 parent 9e86299 commit b3db8c2
Showing 1 changed file with 81 additions and 73 deletions.
154 changes: 81 additions & 73 deletions xds/src/main/java/io/grpc/xds/FaultFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,94 +190,102 @@ public ClientInterceptor buildClientInterceptor(
config = overrideConfig;
}
FaultConfig faultConfig = (FaultConfig) config;
Long delayNanos = null;
Status abortStatus = null;
if (faultConfig.maxActiveFaults() == null
|| activeFaultCounter.get() < faultConfig.maxActiveFaults()) {
Metadata headers = args.getHeaders();
if (faultConfig.faultDelay() != null) {
delayNanos = determineFaultDelayNanos(faultConfig.faultDelay(), headers);
}
if (faultConfig.faultAbort() != null) {
abortStatus = determineFaultAbortStatus(faultConfig.faultAbort(), headers);
}
}
if (delayNanos == null && abortStatus == null) {
return null;
}
final Long finalDelayNanos = delayNanos;
final Status finalAbortStatus = getAbortStatusWithDescription(abortStatus);

final class FaultInjectionInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
final MethodDescriptor<ReqT, RespT> method, final CallOptions callOptions,
final Channel next) {
Executor callExecutor = callOptions.getExecutor();
if (callExecutor == null) { // This should never happen in practice because
// ManagedChannelImpl.ConfigSelectingClientCall always provides CallOptions with
// a callExecutor.
// TODO(https://github.com/grpc/grpc-java/issues/7868)
callExecutor = MoreExecutors.directExecutor();
boolean checkFault = false;
if (faultConfig.maxActiveFaults() == null
|| activeFaultCounter.get() < faultConfig.maxActiveFaults()) {
checkFault = faultConfig.faultDelay() != null || faultConfig.faultAbort() != null;
}
if (finalDelayNanos != null) {
Supplier<? extends ClientCall<ReqT, RespT>> callSupplier;
if (finalAbortStatus != null) {
callSupplier = Suppliers.ofInstance(
new FailingClientCall<ReqT, RespT>(finalAbortStatus, callExecutor));
} else {
callSupplier = new Supplier<ClientCall<ReqT, RespT>>() {
@Override
public ClientCall<ReqT, RespT> get() {
return next.newCall(method, callOptions);
}
};
if (!checkFault) {
return next.newCall(method, callOptions);
}
final class DeadlineInsightForwardingCall extends ForwardingClientCall<ReqT, RespT> {
private ClientCall<ReqT, RespT> delegate;

@Override
protected ClientCall<ReqT, RespT> delegate() {
return delegate;
}
final DelayInjectedCall<ReqT, RespT> delayInjectedCall = new DelayInjectedCall<>(
finalDelayNanos, callExecutor, scheduler, callOptions.getDeadline(), callSupplier);

final class DeadlineInsightForwardingCall extends ForwardingClientCall<ReqT, RespT> {
@Override
protected ClientCall<ReqT, RespT> delegate() {
return delayInjectedCall;
@Override
public void start(Listener<RespT> listener, Metadata headers) {
Executor callExecutor = callOptions.getExecutor();
if (callExecutor == null) { // This should never happen in practice because
// ManagedChannelImpl.ConfigSelectingClientCall always provides CallOptions with
// a callExecutor.
// TODO(https://github.com/grpc/grpc-java/issues/7868)
callExecutor = MoreExecutors.directExecutor();
}

@Override
public void start(Listener<RespT> listener, Metadata headers) {
Listener<RespT> finalListener =
new SimpleForwardingClientCallListener<RespT>(listener) {
@Override
public void onClose(Status status, Metadata trailers) {
if (status.getCode().equals(Code.DEADLINE_EXCEEDED)) {
// TODO(zdapeng:) check effective deadline locally, and
// do the following only if the local deadline is exceeded.
// (If the server sends DEADLINE_EXCEEDED for its own deadline, then the
// injected delay does not contribute to the error, because the request is
// only sent out after the delay. There could be a race between local and
// remote, but it is rather rare.)
String description = String.format(
Locale.US,
"Deadline exceeded after up to %d ns of fault-injected delay",
finalDelayNanos);
if (status.getDescription() != null) {
description = description + ": " + status.getDescription();
}
status = Status.DEADLINE_EXCEEDED
.withDescription(description).withCause(status.getCause());
// Replace trailers to prevent mixing sources of status and trailers.
trailers = new Metadata();
Long delayNanos;
Status abortStatus = null;
if (faultConfig.faultDelay() != null) {
delayNanos = determineFaultDelayNanos(faultConfig.faultDelay(), headers);
} else {
delayNanos = null;
}
if (faultConfig.faultAbort() != null) {
abortStatus = getAbortStatusWithDescription(
determineFaultAbortStatus(faultConfig.faultAbort(), headers));
}

Supplier<? extends ClientCall<ReqT, RespT>> callSupplier;
if (abortStatus != null) {
callSupplier = Suppliers.ofInstance(
new FailingClientCall<ReqT, RespT>(abortStatus, callExecutor));
} else {
callSupplier = new Supplier<ClientCall<ReqT, RespT>>() {
@Override
public ClientCall<ReqT, RespT> get() {
return next.newCall(method, callOptions);
}
};
}
if (delayNanos == null) {
delegate = callSupplier.get();
delegate().start(listener, headers);
return;
}

delegate = new DelayInjectedCall<>(
delayNanos, callExecutor, scheduler, callOptions.getDeadline(), callSupplier);

Listener<RespT> finalListener =
new SimpleForwardingClientCallListener<RespT>(listener) {
@Override
public void onClose(Status status, Metadata trailers) {
if (status.getCode().equals(Code.DEADLINE_EXCEEDED)) {
// TODO(zdapeng:) check effective deadline locally, and
// do the following only if the local deadline is exceeded.
// (If the server sends DEADLINE_EXCEEDED for its own deadline, then the
// injected delay does not contribute to the error, because the request is
// only sent out after the delay. There could be a race between local and
// remote, but it is rather rare.)
String description = String.format(
Locale.US,
"Deadline exceeded after up to %d ns of fault-injected delay",
delayNanos);
if (status.getDescription() != null) {
description = description + ": " + status.getDescription();
}
delegate().onClose(status, trailers);
status = Status.DEADLINE_EXCEEDED
.withDescription(description).withCause(status.getCause());
// Replace trailers to prevent mixing sources of status and trailers.
trailers = new Metadata();
}
};
delegate().start(finalListener, headers);
}
delegate().onClose(status, trailers);
}
};
delegate().start(finalListener, headers);
}

return new DeadlineInsightForwardingCall();
} else {
return new FailingClientCall<>(finalAbortStatus, callExecutor);
}

return new DeadlineInsightForwardingCall();
}
}

Expand Down

0 comments on commit b3db8c2

Please sign in to comment.