Skip to content

Commit

Permalink
api: Replace ErrorPicker with FixedResultPicker
Browse files Browse the repository at this point in the history
FixedResultPicker can be used in more situations. Note that
WrrLocalityLoadBalancerTest's test was changed non-trivially. The
noChildLb test was particularly nasty as it assumed
LoadBalancer.ErrorPicker had same toString() as
GracefulSwitchLoadBalancer's ErrorPicker.
  • Loading branch information
ejona86 authored Sep 26, 2023
1 parent a004096 commit cf4cf03
Show file tree
Hide file tree
Showing 14 changed files with 140 additions and 68 deletions.
24 changes: 24 additions & 0 deletions api/src/main/java/io/grpc/LoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -1412,6 +1412,12 @@ public abstract static class Factory {
public abstract LoadBalancer newLoadBalancer(Helper helper);
}

/**
* A picker that always returns an erring pick.
*
* @deprecated Use {@code new FixedResultPicker(PickResult.withError(error))} instead.
*/
@Deprecated
public static final class ErrorPicker extends SubchannelPicker {

private final Status error;
Expand All @@ -1433,4 +1439,22 @@ public String toString() {
}
}

/** A picker that always returns the same result. */
public static final class FixedResultPicker extends SubchannelPicker {
private final PickResult result;

public FixedResultPicker(PickResult result) {
this.result = Preconditions.checkNotNull(result, "result");
}

@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return result;
}

@Override
public String toString() {
return "FixedResultPicker(" + result + ")";
}
}
}
15 changes: 1 addition & 14 deletions util/src/main/java/io/grpc/util/GracefulSwitchLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static com.google.common.base.Preconditions.checkState;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.ExperimentalApi;
Expand Down Expand Up @@ -57,21 +56,9 @@ public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {

@Override
public void handleNameResolutionError(final Status error) {
class ErrorPicker extends SubchannelPicker {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withError(error);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(ErrorPicker.class).add("error", error).toString();
}
}

helper.updateBalancingState(
ConnectivityState.TRANSIENT_FAILURE,
new ErrorPicker());
new FixedResultPicker(PickResult.withError(error)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ protected SubchannelPicker getInitialPicker() {
}

protected SubchannelPicker getErrorPicker(Status error) {
return new ErrorPicker(error);
return new FixedResultPicker(PickResult.withError(error));
}

@VisibleForTesting
Expand Down
12 changes: 8 additions & 4 deletions xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ public void handleNameResolutionError(Status error) {
if (cdsLbState != null && cdsLbState.childLb != null) {
cdsLbState.childLb.handleNameResolutionError(error);
} else {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
}
}

Expand Down Expand Up @@ -211,7 +212,8 @@ private void handleClusterDiscovered() {
}

if (loopStatus != null) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(loopStatus));
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(loopStatus)));
return;
}

Expand All @@ -223,7 +225,8 @@ private void handleClusterDiscovered() {
Status unavailable =
Status.UNAVAILABLE.withDescription("CDS error: found 0 leaf (logical DNS or EDS) "
+ "clusters for root cluster " + root.name);
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(unavailable));
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(unavailable)));
return;
}

Expand Down Expand Up @@ -295,7 +298,8 @@ private void handleClusterDiscoveryError(Status error) {
if (childLb != null) {
childLb.handleNameResolutionError(error);
} else {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
}
}

Expand Down
3 changes: 2 additions & 1 deletion xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ public void handleNameResolutionError(Status error) {
if (childSwitchLb != null) {
childSwitchLb.handleNameResolutionError(error);
} else {
helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new ErrorPicker(error));
helper.updateBalancingState(
ConnectivityState.TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ public void handleNameResolutionError(Status error) {
if (childLb != null) {
childLb.handleNameResolutionError(error);
} else {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
}
}

Expand Down Expand Up @@ -240,7 +241,8 @@ private void handleEndpointResourceUpdate() {
Status.UNAVAILABLE.withCause(endpointNotFound.getCause())
.withDescription(endpointNotFound.getDescription());
}
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(endpointNotFound));
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(endpointNotFound)));
if (childLb != null) {
childLb.shutdown();
childLb = null;
Expand Down Expand Up @@ -275,7 +277,8 @@ private void handleEndpointResolutionError() {
if (childLb != null) {
childLb.handleNameResolutionError(error);
} else {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ public void handleNameResolutionError(Status error) {
}
}
if (gotoTransientFailure) {
updateOverallState(null, TRANSIENT_FAILURE, new ErrorPicker(error));
updateOverallState(
null, TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
}
}

Expand Down Expand Up @@ -225,8 +226,8 @@ public void run() {
// The child is deactivated.
return;
}
picker = new ErrorPicker(
Status.UNAVAILABLE.withDescription("Connection timeout for priority " + priority));
picker = new FixedResultPicker(PickResult.withError(
Status.UNAVAILABLE.withDescription("Connection timeout for priority " + priority)));
logger.log(XdsLogLevel.DEBUG, "Priority {0} failed over to next", priority);
currentPriority = null; // reset currentPriority to guarantee failover happen
tryNextPriority();
Expand Down
3 changes: 2 additions & 1 deletion xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ private static List<RingEntry> buildRing(
@Override
public void handleNameResolutionError(Status error) {
if (currentState != READY) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ public boolean acceptResolvedAddressesInternal(ResolvedAddresses resolvedAddress
public void handleNameResolutionError(Status error) {
logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error);
if (childBalancers.isEmpty()) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
}
for (LoadBalancer childBalancer : childBalancers.values()) {
childBalancer.handleNameResolutionError(error);
Expand Down
8 changes: 4 additions & 4 deletions xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
Integer localityWeight = eagAttrs.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT);

if (locality == null) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(
Status.UNAVAILABLE.withDescription("wrr_locality error: no locality provided")));
helper.updateBalancingState(TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(
Status.UNAVAILABLE.withDescription("wrr_locality error: no locality provided"))));
return false;
}
if (localityWeight == null) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(
helper.updateBalancingState(TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(
Status.UNAVAILABLE.withDescription(
"wrr_locality error: no weight provided for locality " + locality)));
"wrr_locality error: no weight provided for locality " + locality))));
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,8 @@ public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
config = resolvedAddresses.getLoadBalancingPolicyConfig();

if (failing) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.INTERNAL));
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(Status.INTERNAL)));
}
return true;
}
Expand Down
38 changes: 25 additions & 13 deletions xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import io.grpc.ConnectivityState;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.ErrorPicker;
import io.grpc.LoadBalancer.FixedResultPicker;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
Expand Down Expand Up @@ -306,7 +306,8 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
assertCurrentPickerPicksSubchannel(subchannel0);

// p0 fails over to p1 immediately.
helper0.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.ABORTED));
helper0.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(Status.ABORTED)));
assertLatestConnectivityState(CONNECTING);
assertThat(fooBalancers).hasSize(2);
assertThat(fooHelpers).hasSize(2);
Expand Down Expand Up @@ -345,11 +346,13 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
assertCurrentPickerPicksSubchannel(subchannel2);

// p2 fails but does not affect overall picker
helper2.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.UNAVAILABLE));
helper2.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(Status.UNAVAILABLE)));
assertCurrentPickerPicksSubchannel(subchannel2);

// p0 fails over to p3 immediately since p1 already timeout and p2 already in TRANSIENT_FAILURE.
helper0.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.UNAVAILABLE));
helper0.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(Status.UNAVAILABLE)));
assertLatestConnectivityState(CONNECTING);
assertThat(fooBalancers).hasSize(4);
assertThat(fooHelpers).hasSize(4);
Expand All @@ -362,7 +365,8 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {

// p3 fails then the picker should have error status updated
helper3.updateBalancingState(
TRANSIENT_FAILURE, new ErrorPicker(Status.DATA_LOSS.withDescription("foo")));
TRANSIENT_FAILURE,
new FixedResultPicker(PickResult.withError(Status.DATA_LOSS.withDescription("foo"))));
assertCurrentPickerReturnsError(Status.Code.DATA_LOSS, "foo");

// p2 gets back to READY
Expand Down Expand Up @@ -390,7 +394,8 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
assertCurrentPickerPicksSubchannel(subchannel4);

// p0 fails over to p2 and picker is updated to p2's existing picker.
helper0.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.UNAVAILABLE));
helper0.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(Status.UNAVAILABLE)));
assertCurrentPickerPicksSubchannel(subchannel3);

// Deactivate child balancer get deleted.
Expand Down Expand Up @@ -564,7 +569,8 @@ public void typicalPriorityFailOverFlowWithIdleUpdate() {
assertCurrentPickerIsBufferPicker();

// p0 fails over to p1 immediately.
helper0.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.ABORTED));
helper0.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(Status.ABORTED)));
assertLatestConnectivityState(CONNECTING);
assertThat(fooBalancers).hasSize(2);
assertThat(fooHelpers).hasSize(2);
Expand All @@ -591,11 +597,13 @@ public void typicalPriorityFailOverFlowWithIdleUpdate() {
assertCurrentPickerIsBufferPicker();

// p2 fails but does not affect overall picker
helper2.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.UNAVAILABLE));
helper2.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(Status.UNAVAILABLE)));
assertCurrentPickerIsBufferPicker();

// p0 fails over to p3 immediately since p1 already timeout and p2 already in TRANSIENT_FAILURE.
helper0.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.UNAVAILABLE));
helper0.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(Status.UNAVAILABLE)));
assertLatestConnectivityState(CONNECTING);
assertThat(fooBalancers).hasSize(4);
assertThat(fooHelpers).hasSize(4);
Expand All @@ -608,7 +616,8 @@ public void typicalPriorityFailOverFlowWithIdleUpdate() {

// p3 fails then the picker should have error status updated
helper3.updateBalancingState(
TRANSIENT_FAILURE, new ErrorPicker(Status.DATA_LOSS.withDescription("foo")));
TRANSIENT_FAILURE,
new FixedResultPicker(PickResult.withError(Status.DATA_LOSS.withDescription("foo"))));
assertCurrentPickerReturnsError(Status.Code.DATA_LOSS, "foo");

// p2 gets back to IDLE
Expand All @@ -624,7 +633,8 @@ public void typicalPriorityFailOverFlowWithIdleUpdate() {
assertCurrentPickerIsBufferPicker();

// p0 fails over to p2 and picker is updated to p2's existing picker.
helper0.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.UNAVAILABLE));
helper0.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(Status.UNAVAILABLE)));
assertCurrentPickerIsBufferPicker();

// Deactivate child balancer get deleted.
Expand Down Expand Up @@ -655,7 +665,8 @@ public void bypassReresolutionRequestsIfConfiged() {
verify(helper, never()).refreshNameResolution();

// Simulate fallback to priority p1.
priorityHelper0.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.UNAVAILABLE));
priorityHelper0.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(Status.UNAVAILABLE)));
assertThat(fooHelpers).hasSize(2);
Helper priorityHelper1 = Iterables.getLast(fooHelpers);
priorityHelper1.refreshNameResolution();
Expand Down Expand Up @@ -780,7 +791,8 @@ static class FakeLoadBalancer extends LoadBalancer {

@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.INTERNAL));
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(Status.INTERNAL)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.ErrorPicker;
import io.grpc.LoadBalancer.FixedResultPicker;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
Expand Down Expand Up @@ -324,10 +324,10 @@ public void balancingStateUpdatedFromChildBalancers() {
mock(SubchannelPicker.class),
mock(SubchannelPicker.class)};
final SubchannelPicker[] failurePickers = new SubchannelPicker[]{
new ErrorPicker(Status.CANCELLED),
new ErrorPicker(Status.ABORTED),
new ErrorPicker(Status.DATA_LOSS),
new ErrorPicker(Status.DATA_LOSS)
new FixedResultPicker(PickResult.withError(Status.CANCELLED)),
new FixedResultPicker(PickResult.withError(Status.ABORTED)),
new FixedResultPicker(PickResult.withError(Status.DATA_LOSS)),
new FixedResultPicker(PickResult.withError(Status.DATA_LOSS))
};
ArgumentCaptor<SubchannelPicker> pickerCaptor = ArgumentCaptor.forClass(SubchannelPicker.class);

Expand Down Expand Up @@ -463,7 +463,8 @@ static class FakeLoadBalancer extends LoadBalancer {

@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.INTERNAL));
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(Status.INTERNAL)));
}

@Override
Expand Down
Loading

0 comments on commit cf4cf03

Please sign in to comment.