Skip to content

Commit

Permalink
use an attribute from resolved addresses IS_PETIOLE_POLICY to control…
Browse files Browse the repository at this point in the history
… whether or not health checking is supported (grpc#11513)

* use an attribute from resolved addresses IS_PETIOLE_POLICY to control whether or not health checking is supported so that top level versions can't do any health checking, while those under petiole policies can.

Fixes grpc#11413
  • Loading branch information
larry-safran authored Sep 6, 2024
1 parent f6d2f20 commit 5de65a6
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 5 deletions.
17 changes: 15 additions & 2 deletions core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
private ConnectivityState concludedState = IDLE;
private final boolean enableHappyEyeballs =
PickFirstLoadBalancerProvider.isEnabledHappyEyeballs();
private boolean notAPetiolePolicy = true; // means not under a petiole policy

PickFirstLeafLoadBalancer(Helper helper) {
this.helper = checkNotNull(helper, "helper");
Expand All @@ -80,6 +81,10 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
return Status.FAILED_PRECONDITION.withDescription("Already shut down");
}

// Cache whether or not this is a petiole policy, which is based off of an address attribute
Boolean isPetiolePolicy = resolvedAddresses.getAttributes().get(IS_PETIOLE_POLICY);
this.notAPetiolePolicy = isPetiolePolicy == null || !isPetiolePolicy;

List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses();

// Validate the address list
Expand Down Expand Up @@ -303,7 +308,8 @@ private void updateHealthCheckedState(SubchannelData subchannelData) {
if (subchannelData.state != READY) {
return;
}
if (subchannelData.getHealthState() == READY) {

if (notAPetiolePolicy || subchannelData.getHealthState() == READY) {
updateBalancingState(READY,
new FixedResultPicker(PickResult.withSubchannel(subchannelData.subchannel)));
} else if (subchannelData.getHealthState() == TRANSIENT_FAILURE) {
Expand Down Expand Up @@ -444,7 +450,7 @@ private SubchannelData createNewSubchannel(SocketAddress addr, Attributes attrs)
hcListener.subchannelData = subchannelData;
subchannels.put(addr, subchannelData);
Attributes scAttrs = subchannel.getAttributes();
if (scAttrs.get(LoadBalancer.HAS_HEALTH_PRODUCER_LISTENER_KEY) == null) {
if (notAPetiolePolicy || scAttrs.get(LoadBalancer.HAS_HEALTH_PRODUCER_LISTENER_KEY) == null) {
subchannelData.healthStateInfo = ConnectivityStateInfo.forNonError(READY);
}
subchannel.start(stateInfo -> processSubchannelState(subchannelData, stateInfo));
Expand All @@ -468,6 +474,13 @@ private final class HealthListener implements SubchannelStateListener {

@Override
public void onSubchannelState(ConnectivityStateInfo newState) {
if (notAPetiolePolicy) {
log.log(Level.WARNING,
"Ignoring health status {0} for subchannel {1} as this is not under a petiole policy",
new Object[]{newState, subchannelData.subchannel});
return;
}

log.log(Level.FINE, "Received health status {0} for subchannel {1}",
new Object[]{newState, subchannelData.subchannel});
subchannelData.healthStateInfo = newState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.LoadBalancer.HAS_HEALTH_PRODUCER_LISTENER_KEY;
import static io.grpc.LoadBalancer.HEALTH_CONSUMER_LISTENER_ARG_KEY;
import static io.grpc.LoadBalancer.IS_PETIOLE_POLICY;
import static io.grpc.internal.PickFirstLeafLoadBalancer.CONNECTION_DELAY_INTERVAL_MS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -389,15 +390,44 @@ public void pickAfterResolvedAndChanged() {
verify(mockSubchannel2).requestConnection();
}

@Test
public void healthCheck_nonPetiolePolicy() {
when(mockSubchannel1.getAttributes()).thenReturn(
Attributes.newBuilder().set(HAS_HEALTH_PRODUCER_LISTENER_KEY, true).build());

// Initialize with one server loadbalancer and both health and state listeners
List<EquivalentAddressGroup> oneServer = Lists.newArrayList(servers.get(0));
loadBalancer.acceptResolvedAddresses(ResolvedAddresses.newBuilder().setAddresses(oneServer)
.setAttributes(Attributes.EMPTY).build());
InOrder inOrder = inOrder(mockHelper, mockSubchannel1);
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class));
inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture());
SubchannelStateListener healthListener = createArgsCaptor.getValue()
.getOption(HEALTH_CONSUMER_LISTENER_ARG_KEY);
inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture());
SubchannelStateListener stateListener = stateListenerCaptor.getValue();

stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));
healthListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));
inOrder.verify(mockHelper, never()).updateBalancingState(any(), any());

stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY));
inOrder.verify(mockHelper).updateBalancingState(eq(READY), any()); // health listener ignored

healthListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(Status.INTERNAL));
inOrder.verify(mockHelper, never()).updateBalancingState(any(), any(SubchannelPicker.class));
}

@Test
public void healthCheckFlow() {
when(mockSubchannel1.getAttributes()).thenReturn(
Attributes.newBuilder().set(HAS_HEALTH_PRODUCER_LISTENER_KEY, true).build());
when(mockSubchannel2.getAttributes()).thenReturn(
Attributes.newBuilder().set(HAS_HEALTH_PRODUCER_LISTENER_KEY, true).build());

List<EquivalentAddressGroup> oneServer = Lists.newArrayList(servers.get(0), servers.get(1));
loadBalancer.acceptResolvedAddresses(ResolvedAddresses.newBuilder().setAddresses(oneServer)
.setAttributes(Attributes.EMPTY).build());
.setAttributes(Attributes.newBuilder().set(IS_PETIOLE_POLICY, true).build()).build());

InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2);
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class));
Expand All @@ -413,13 +443,13 @@ public void healthCheckFlow() {
// subchannel2 | IDLE | IDLE
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));
healthListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));
inOrder.verify(mockHelper, times(0)).updateBalancingState(any(), any());
inOrder.verify(mockHelper, never()).updateBalancingState(any(), any());

// subchannel | state | health
// subchannel1 | READY | CONNECTING
// subchannel2 | IDLE | IDLE
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY));
inOrder.verify(mockHelper, times(0)).updateBalancingState(any(), any());
inOrder.verify(mockHelper, never()).updateBalancingState(any(), any());

// subchannel | state | health
// subchannel1 | READY | READY
Expand Down

0 comments on commit 5de65a6

Please sign in to comment.