diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java index 04b7663fd35..38e6381bfbf 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java @@ -18,26 +18,40 @@ import static com.google.common.base.Preconditions.checkNotNull; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; -import static io.grpc.xds.XdsLbPolicies.CLUSTER_RESOLVER_POLICY_NAME; +import static io.grpc.xds.CdsLoadBalancer2.ClusterResolverConfig.DiscoveryMechanism; +import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.Struct; +import io.grpc.Attributes; +import io.grpc.EquivalentAddressGroup; import io.grpc.InternalLogId; import io.grpc.LoadBalancer; +import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerRegistry; import io.grpc.NameResolver; import io.grpc.Status; +import io.grpc.StatusOr; import io.grpc.SynchronizationContext; -import io.grpc.internal.ObjectPool; +import io.grpc.internal.BackoffPolicy; +import io.grpc.internal.ExponentialBackoffPolicy; +import io.grpc.util.ForwardingLoadBalancerHelper; import io.grpc.util.GracefulSwitchLoadBalancer; +import io.grpc.util.OutlierDetectionLoadBalancer; +import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig; import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig; -import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig; -import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism; +import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig; import io.grpc.xds.XdsClusterResource.CdsUpdate; import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType; -import io.grpc.xds.client.XdsClient; -import io.grpc.xds.client.XdsClient.ResourceWatcher; +import io.grpc.xds.XdsEndpointResource.EdsUpdate; +import io.grpc.xds.client.Bootstrapper; +import io.grpc.xds.client.Locality; import io.grpc.xds.client.XdsLogger; import io.grpc.xds.client.XdsLogger.XdsLogLevel; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; @@ -46,10 +60,14 @@ import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Queue; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; /** @@ -62,46 +80,211 @@ final class CdsLoadBalancer2 extends LoadBalancer { private final Helper helper; private final SynchronizationContext syncContext; private final LoadBalancerRegistry lbRegistry; - // Following fields are effectively final. - private ObjectPool xdsClientPool; - private XdsClient xdsClient; - private CdsLbState cdsLbState; + private CdsLbState rootCdsLbState; private ResolvedAddresses resolvedAddresses; + private final BackoffPolicy.Provider backoffPolicyProvider; CdsLoadBalancer2(Helper helper) { - this(helper, LoadBalancerRegistry.getDefaultRegistry()); + this(helper, LoadBalancerRegistry.getDefaultRegistry(), + new ExponentialBackoffPolicy.Provider()); } @VisibleForTesting - CdsLoadBalancer2(Helper helper, LoadBalancerRegistry lbRegistry) { + CdsLoadBalancer2(Helper helper, LoadBalancerRegistry lbRegistry, + BackoffPolicy.Provider backoffPolicyProvider) { this.helper = checkNotNull(helper, "helper"); this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext"); this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry"); + this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); logger = XdsLogger.withLogId(InternalLogId.allocate("cds-lb", helper.getAuthority())); logger.log(XdsLogLevel.INFO, "Created"); } + /** + * Generates the config to be used in the priority LB policy for the single priority of + * logical DNS cluster. + * + *

priority LB -> cluster_impl LB (single hardcoded priority) -> pick_first + */ + static PriorityChildConfig generateDnsBasedPriorityChildConfig( + String cluster, @Nullable Bootstrapper.ServerInfo lrsServerInfo, + @Nullable Long maxConcurrentRequests, + @Nullable EnvoyServerProtoData.UpstreamTlsContext tlsContext, + Map filterMetadata, + LoadBalancerRegistry lbRegistry, List dropOverloads) { + // Override endpoint-level LB policy with pick_first for logical DNS cluster. + Object endpointLbConfig = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( + lbRegistry.getProvider("pick_first"), null); + ClusterImplLoadBalancerProvider.ClusterImplConfig clusterImplConfig = + new ClusterImplLoadBalancerProvider.ClusterImplConfig(cluster, null, lrsServerInfo, + maxConcurrentRequests, dropOverloads, endpointLbConfig, tlsContext, filterMetadata); + LoadBalancerProvider clusterImplLbProvider = + lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME); + Object clusterImplPolicy = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( + clusterImplLbProvider, clusterImplConfig); + return new PriorityChildConfig(clusterImplPolicy, false /* ignoreReresolution*/); + } + + /** + * Generates configs to be used in the priority LB policy for priorities in an EDS cluster. + * + *

priority LB -> cluster_impl LB (one per priority) -> (weighted_target LB + * -> round_robin / least_request_experimental (one per locality)) / ring_hash_experimental + */ + static Map generateEdsBasedPriorityChildConfigs( + String cluster, @Nullable String edsServiceName, + @Nullable Bootstrapper.ServerInfo lrsServerInfo, + @Nullable Long maxConcurrentRequests, + @Nullable EnvoyServerProtoData.UpstreamTlsContext tlsContext, + Map filterMetadata, + @Nullable EnvoyServerProtoData.OutlierDetection outlierDetection, Object endpointLbConfig, + LoadBalancerRegistry lbRegistry, Map> prioritizedLocalityWeights, + List dropOverloads) { + Map configs = new HashMap<>(); + for (String priority : prioritizedLocalityWeights.keySet()) { + ClusterImplLoadBalancerProvider.ClusterImplConfig clusterImplConfig = + new ClusterImplLoadBalancerProvider.ClusterImplConfig( + cluster, edsServiceName, lrsServerInfo, maxConcurrentRequests, + dropOverloads, endpointLbConfig, tlsContext, filterMetadata); + LoadBalancerProvider clusterImplLbProvider = + lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME); + Object priorityChildPolicy = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( + clusterImplLbProvider, clusterImplConfig); + + // If outlier detection has been configured we wrap the child policy in the outlier detection + // load balancer. + if (outlierDetection != null) { + LoadBalancerProvider outlierDetectionProvider = lbRegistry.getProvider( + "outlier_detection_experimental"); + priorityChildPolicy = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( + outlierDetectionProvider, + buildOutlierDetectionLbConfig(outlierDetection, priorityChildPolicy)); + } + + PriorityChildConfig priorityChildConfig = + new PriorityChildConfig(priorityChildPolicy, true /* ignoreReresolution */); + configs.put(priority, priorityChildConfig); + } + return configs; + } + + /** + * Converts {@link EnvoyServerProtoData.OutlierDetection} that represents the xDS configuration to + * {@link OutlierDetectionLoadBalancerConfig} that the {@link OutlierDetectionLoadBalancer} + * understands. + */ + private static OutlierDetectionLoadBalancerConfig buildOutlierDetectionLbConfig( + EnvoyServerProtoData.OutlierDetection outlierDetection, Object childConfig) { + OutlierDetectionLoadBalancerConfig.Builder configBuilder + = new OutlierDetectionLoadBalancerConfig.Builder(); + + configBuilder.setChildConfig(childConfig); + + if (outlierDetection.intervalNanos() != null) { + configBuilder.setIntervalNanos(outlierDetection.intervalNanos()); + } + if (outlierDetection.baseEjectionTimeNanos() != null) { + configBuilder.setBaseEjectionTimeNanos(outlierDetection.baseEjectionTimeNanos()); + } + if (outlierDetection.maxEjectionTimeNanos() != null) { + configBuilder.setMaxEjectionTimeNanos(outlierDetection.maxEjectionTimeNanos()); + } + if (outlierDetection.maxEjectionPercent() != null) { + configBuilder.setMaxEjectionPercent(outlierDetection.maxEjectionPercent()); + } + + EnvoyServerProtoData.SuccessRateEjection successRate = outlierDetection.successRateEjection(); + if (successRate != null) { + OutlierDetectionLoadBalancerConfig.SuccessRateEjection.Builder successRateConfigBuilder = + new OutlierDetectionLoadBalancerConfig.SuccessRateEjection.Builder(); + + if (successRate.stdevFactor() != null) { + successRateConfigBuilder.setStdevFactor(successRate.stdevFactor()); + } + if (successRate.enforcementPercentage() != null) { + successRateConfigBuilder.setEnforcementPercentage(successRate.enforcementPercentage()); + } + if (successRate.minimumHosts() != null) { + successRateConfigBuilder.setMinimumHosts(successRate.minimumHosts()); + } + if (successRate.requestVolume() != null) { + successRateConfigBuilder.setRequestVolume(successRate.requestVolume()); + } + + configBuilder.setSuccessRateEjection(successRateConfigBuilder.build()); + } + + EnvoyServerProtoData.FailurePercentageEjection failurePercentage = + outlierDetection.failurePercentageEjection(); + if (failurePercentage != null) { + OutlierDetectionLoadBalancerConfig.FailurePercentageEjection.Builder failurePctCfgBldr = + new OutlierDetectionLoadBalancerConfig.FailurePercentageEjection.Builder(); + + if (failurePercentage.threshold() != null) { + failurePctCfgBldr.setThreshold(failurePercentage.threshold()); + } + if (failurePercentage.enforcementPercentage() != null) { + failurePctCfgBldr.setEnforcementPercentage(failurePercentage.enforcementPercentage()); + } + if (failurePercentage.minimumHosts() != null) { + failurePctCfgBldr.setMinimumHosts(failurePercentage.minimumHosts()); + } + if (failurePercentage.requestVolume() != null) { + failurePctCfgBldr.setRequestVolume(failurePercentage.requestVolume()); + } + + configBuilder.setFailurePercentageEjection(failurePctCfgBldr.build()); + } + + return configBuilder.build(); + } + + /** + * Generates a string that represents the priority in the LB policy config. The string is unique + * across priorities in all clusters and priorityName(c, p1) < priorityName(c, p2) iff p1 < p2. + * The ordering is undefined for priorities in different clusters. + */ + static String priorityName(String cluster, int priority) { + return cluster + "[child" + priority + "]"; + } + + /** + * Generates a string that represents the locality in the LB policy config. The string is unique + * across all localities in all clusters. + */ + static String localityName(Locality locality) { + return "{region=\"" + locality.region() + + "\", zone=\"" + locality.zone() + + "\", sub_zone=\"" + locality.subZone() + + "\"}"; + } + @Override public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { - if (this.resolvedAddresses != null) { - return Status.OK; - } + checkNotNull(resolvedAddresses, "resolvedAddresses"); + String rootClusterName = ((CdsConfig) resolvedAddresses.getLoadBalancingPolicyConfig()).name; + assert rootCdsLbState.root.name.equals(rootClusterName); + XdsConfig xdsConfig = resolvedAddresses.getAttributes().get(XdsAttributes.XDS_CONFIG); + logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses); this.resolvedAddresses = resolvedAddresses; - xdsClientPool = resolvedAddresses.getAttributes().get(XdsAttributes.XDS_CLIENT_POOL); - xdsClient = xdsClientPool.getObject(); - CdsConfig config = (CdsConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); - logger.log(XdsLogLevel.INFO, "Config: {0}", config); - cdsLbState = new CdsLbState(config.name); - cdsLbState.start(); + if (rootCdsLbState == null) { + rootCdsLbState = + new CdsLbState(rootClusterName, xdsConfig.getClusters().get(rootClusterName)); + rootCdsLbState.start(); + } + // Do all of the work changing the LB policies + rootCdsLbState.update(xdsConfig); + return Status.OK; } @Override public void handleNameResolutionError(Status error) { logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error); - if (cdsLbState != null && cdsLbState.childLb != null) { - cdsLbState.childLb.handleNameResolutionError(error); + if (rootCdsLbState != null && rootCdsLbState.childLb != null) { + rootCdsLbState.childLb.handleNameResolutionError(error); } else { helper.updateBalancingState( TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error))); @@ -111,11 +294,726 @@ public void handleNameResolutionError(Status error) { @Override public void shutdown() { logger.log(XdsLogLevel.INFO, "Shutdown"); - if (cdsLbState != null) { - cdsLbState.shutdown(); + if (rootCdsLbState != null) { + rootCdsLbState.shutdown(); + } + } + + final class ClusterResolverLbStateFactory extends Factory { + @Override + public LoadBalancer newLoadBalancer(Helper helper) { + return new ClusterResolverLbState(helper); + } + } + + /** + * The state of a cluster_resolver LB working session. A new instance is created whenever + * the cluster_resolver LB receives a new config. The old instance is replaced when the + * new one is ready to handle new RPCs. + */ + private final class ClusterResolverLbState extends LoadBalancer { + private final Helper helper; + private final List clusters = new ArrayList<>(); + private final Map clusterStates = new HashMap<>(); + private Object endpointLbConfig; + private ResolvedAddresses resolvedAddresses; + private LoadBalancer childLb; + + + ClusterResolverLbState(Helper helper) { + this.helper = new RefreshableHelper(checkNotNull(helper, "helper")); + logger.log(XdsLogLevel.DEBUG, "New ClusterResolverLbState"); + } + + @Override + public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { + this.resolvedAddresses = resolvedAddresses; + ClusterResolverConfig config = + (ClusterResolverConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); + endpointLbConfig = config.lbConfig; + for (DiscoveryMechanism instance : config.discoveryMechanisms) { + clusters.add(instance.cluster); + ClusterState state; + if (instance.type == DiscoveryMechanism.Type.EDS) { + state = new EdsClusterState(instance.cluster, instance.edsServiceName, + instance.endpointConfig, + instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext, + instance.filterMetadata, instance.outlierDetection); + } else { // logical DNS + state = new LogicalDnsClusterState(instance.cluster, instance.dnsHostName, + instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext, + instance.filterMetadata); + } + clusterStates.put(instance.cluster, state); + state.start(); + } + return Status.OK; + } + + @Override + public void handleNameResolutionError(Status error) { + if (childLb != null) { + childLb.handleNameResolutionError(error); + } else { + helper.updateBalancingState( + TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error))); + } + } + + @Override + public void shutdown() { + for (ClusterState state : clusterStates.values()) { + state.shutdown(); + } + if (childLb != null) { + childLb.shutdown(); + } + } + + private void handleEndpointResourceUpdate() { + List addresses = new ArrayList<>(); + Map priorityChildConfigs = new HashMap<>(); + List priorities = new ArrayList<>(); // totally ordered priority list + + Status endpointNotFound = Status.OK; + for (String cluster : clusters) { + ClusterState state = clusterStates.get(cluster); + // Propagate endpoints to the child LB policy only after all clusters have been resolved. + if (!state.resolved && state.status.isOk()) { + return; + } + if (state.result != null) { + addresses.addAll(state.result.addresses); + priorityChildConfigs.putAll(state.result.priorityChildConfigs); + priorities.addAll(state.result.priorities); + } else { + endpointNotFound = state.status; + } + } + if (addresses.isEmpty()) { + if (endpointNotFound.isOk()) { + endpointNotFound = Status.UNAVAILABLE.withDescription( + "No usable endpoint from cluster(s): " + clusters); + } else { + endpointNotFound = + Status.UNAVAILABLE.withCause(endpointNotFound.getCause()) + .withDescription(endpointNotFound.getDescription()); + } + helper.updateBalancingState( + TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(endpointNotFound))); + if (childLb != null) { + childLb.shutdown(); + childLb = null; + } + return; + } + PriorityLoadBalancerProvider.PriorityLbConfig childConfig = + new PriorityLoadBalancerProvider.PriorityLbConfig( + Collections.unmodifiableMap(priorityChildConfigs), + Collections.unmodifiableList(priorities)); + if (childLb == null) { + childLb = lbRegistry.getProvider(PRIORITY_POLICY_NAME).newLoadBalancer(helper); + } + childLb.handleResolvedAddresses( + resolvedAddresses.toBuilder() + .setLoadBalancingPolicyConfig(childConfig) + .setAddresses(Collections.unmodifiableList(addresses)) + .build()); + } + + private void handleEndpointResolutionError() { + boolean allInError = true; + Status error = null; + for (String cluster : clusters) { + ClusterState state = clusterStates.get(cluster); + if (state.status.isOk()) { + allInError = false; + } else { + error = state.status; + } + } + if (allInError) { + if (childLb != null) { + childLb.handleNameResolutionError(error); + } else { + helper.updateBalancingState( + TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error))); + } + } + } + + /** + * Wires re-resolution requests from downstream LB policies with DNS resolver. + */ + private final class RefreshableHelper extends ForwardingLoadBalancerHelper { + private final Helper delegate; + + private RefreshableHelper(Helper delegate) { + this.delegate = checkNotNull(delegate, "delegate"); + } + + @Override + public void refreshNameResolution() { + for (ClusterState state : clusterStates.values()) { + if (state instanceof LogicalDnsClusterState) { + ((LogicalDnsClusterState) state).refresh(); + } + } + } + + @Override + protected Helper delegate() { + return delegate; + } + } + + /** + * Resolution state of an underlying cluster. + */ + private abstract class ClusterState { + // Name of the cluster to be resolved. + protected final String name; + @Nullable + protected final Bootstrapper.ServerInfo lrsServerInfo; + @Nullable + protected final Long maxConcurrentRequests; + @Nullable + protected final EnvoyServerProtoData.UpstreamTlsContext tlsContext; + protected final Map filterMetadata; + @Nullable + protected final EnvoyServerProtoData.OutlierDetection outlierDetection; + // Resolution status, may contain most recent error encountered. + protected Status status = Status.OK; + // True if has received resolution result. + protected boolean resolved; + // Most recently resolved addresses and config, or null if resource not exists. + @Nullable + protected ClusterResolutionResult result; + + protected boolean shutdown; + + private ClusterState(String name, @Nullable Bootstrapper.ServerInfo lrsServerInfo, + @Nullable Long maxConcurrentRequests, + @Nullable EnvoyServerProtoData.UpstreamTlsContext tlsContext, + Map filterMetadata, + @Nullable EnvoyServerProtoData.OutlierDetection outlierDetection) { + this.name = name; + this.lrsServerInfo = lrsServerInfo; + this.maxConcurrentRequests = maxConcurrentRequests; + this.tlsContext = tlsContext; + this.filterMetadata = ImmutableMap.copyOf(filterMetadata); + this.outlierDetection = outlierDetection; + } + + abstract void start(); + + void shutdown() { + shutdown = true; + } + } + + private final class EdsClusterState extends ClusterState { + @Nullable + private final String edsServiceName; + private Map localityPriorityNames = Collections.emptyMap(); + int priorityNameGenId = 1; + private EdsUpdate edsUpdate; + + private EdsClusterState(String name, @Nullable String edsServiceName, + StatusOr edsUpdate, + @Nullable Bootstrapper.ServerInfo lrsServerInfo, + @Nullable Long maxConcurrentRequests, + @Nullable EnvoyServerProtoData.UpstreamTlsContext tlsContext, + Map filterMetadata, + @Nullable EnvoyServerProtoData.OutlierDetection outlierDetection) { + super(name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, + outlierDetection); + this.edsServiceName = edsServiceName; + if (edsUpdate.hasValue()) { + this.edsUpdate = edsUpdate.getValue(); + } else { + onError(edsUpdate.getStatus()); + } + } + + @Override + void start() { + onChanged(edsUpdate); + } + + @Override + protected void shutdown() { + super.shutdown(); + } + + public void onChanged(final EdsUpdate update) { + class EndpointsUpdated implements Runnable { + @Override + public void run() { + if (shutdown) { + return; + } + logger.log(XdsLogLevel.DEBUG, "Received endpoint update {0}", update); + if (logger.isLoggable(XdsLogLevel.INFO)) { + logger.log(XdsLogLevel.INFO, "Cluster {0}: {1} localities, {2} drop categories", + update.clusterName, update.localityLbEndpointsMap.size(), + update.dropPolicies.size()); + } + Map localityLbEndpoints = + update.localityLbEndpointsMap; + List dropOverloads = update.dropPolicies; + List addresses = new ArrayList<>(); + Map> prioritizedLocalityWeights = new HashMap<>(); + List sortedPriorityNames = generatePriorityNames(name, localityLbEndpoints); + for (Locality locality : localityLbEndpoints.keySet()) { + Endpoints.LocalityLbEndpoints localityLbInfo = localityLbEndpoints.get(locality); + String priorityName = localityPriorityNames.get(locality); + boolean discard = true; + for (Endpoints.LbEndpoint endpoint : localityLbInfo.endpoints()) { + if (endpoint.isHealthy()) { + discard = false; + long weight = localityLbInfo.localityWeight(); + if (endpoint.loadBalancingWeight() != 0) { + weight *= endpoint.loadBalancingWeight(); + } + String localityName = localityName(locality); + Attributes attr = + endpoint.eag().getAttributes().toBuilder() + .set(XdsAttributes.ATTR_LOCALITY, locality) + .set(XdsAttributes.ATTR_LOCALITY_NAME, localityName) + .set(XdsAttributes.ATTR_LOCALITY_WEIGHT, + localityLbInfo.localityWeight()) + .set(XdsAttributes.ATTR_SERVER_WEIGHT, weight) + .set(XdsAttributes.ATTR_ADDRESS_NAME, endpoint.hostname()) + .build(); + EquivalentAddressGroup eag = new EquivalentAddressGroup( + endpoint.eag().getAddresses(), attr); + eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName)); + addresses.add(eag); + } + } + if (discard) { + logger.log(XdsLogLevel.INFO, + "Discard locality {0} with 0 healthy endpoints", locality); + continue; + } + if (!prioritizedLocalityWeights.containsKey(priorityName)) { + prioritizedLocalityWeights.put(priorityName, new HashMap()); + } + prioritizedLocalityWeights.get(priorityName).put( + locality, localityLbInfo.localityWeight()); + } + if (prioritizedLocalityWeights.isEmpty()) { + // Will still update the result, as if the cluster resource is revoked. + logger.log(XdsLogLevel.INFO, + "Cluster {0} has no usable priority/locality/endpoint", update.clusterName); + } + sortedPriorityNames.retainAll(prioritizedLocalityWeights.keySet()); + Map + priorityChildConfigs = + generateEdsBasedPriorityChildConfigs(name, edsServiceName, + lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, + outlierDetection, endpointLbConfig, lbRegistry, prioritizedLocalityWeights, + dropOverloads); + status = Status.OK; + resolved = true; + result = new ClusterResolutionResult(addresses, priorityChildConfigs, + sortedPriorityNames); + handleEndpointResourceUpdate(); + } + } + + new EndpointsUpdated().run(); + } + + private List generatePriorityNames( + String name, Map localityLbEndpoints) { + TreeMap> todo = new TreeMap<>(); + for (Locality locality : localityLbEndpoints.keySet()) { + int priority = localityLbEndpoints.get(locality).priority(); + if (!todo.containsKey(priority)) { + todo.put(priority, new ArrayList<>()); + } + todo.get(priority).add(locality); + } + Map newNames = new HashMap<>(); + Set usedNames = new HashSet<>(); + List ret = new ArrayList<>(); + for (Integer priority: todo.keySet()) { + String foundName = ""; + for (Locality locality : todo.get(priority)) { + if (localityPriorityNames.containsKey(locality) + && usedNames.add(localityPriorityNames.get(locality))) { + foundName = localityPriorityNames.get(locality); + break; + } + } + if ("".equals(foundName)) { + foundName = String.format(Locale.US, "%s[child%d]", name, priorityNameGenId++); + } + for (Locality locality : todo.get(priority)) { + newNames.put(locality, foundName); + } + ret.add(foundName); + } + localityPriorityNames = newNames; + return ret; + } + + void onError(final Status error) { + if (shutdown) { + return; + } + String resourceName = edsServiceName != null ? edsServiceName : name; + status = Status.UNAVAILABLE + .withDescription(String.format("Unable to load EDS %s. xDS server returned: %s: %s", + resourceName, error.getCode(), error.getDescription())) + .withCause(error.getCause()); + logger.log(XdsLogLevel.WARNING, "Received EDS error: {0}", error); + handleEndpointResolutionError(); + } } - if (xdsClientPool != null) { - xdsClientPool.returnObject(xdsClient); + + private final class LogicalDnsClusterState extends ClusterState { + private final String dnsHostName; + private final NameResolver.Factory nameResolverFactory; + private final NameResolver.Args nameResolverArgs; + private NameResolver resolver; + @Nullable + private BackoffPolicy backoffPolicy; + @Nullable + private SynchronizationContext.ScheduledHandle scheduledRefresh; + + private LogicalDnsClusterState(String name, String dnsHostName, + @Nullable Bootstrapper.ServerInfo lrsServerInfo, + @Nullable Long maxConcurrentRequests, + @Nullable EnvoyServerProtoData.UpstreamTlsContext tlsContext, + Map filterMetadata) { + super(name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, null); + this.dnsHostName = checkNotNull(dnsHostName, "dnsHostName"); + nameResolverFactory = + checkNotNull(helper.getNameResolverRegistry().asFactory(), "nameResolverFactory"); + nameResolverArgs = checkNotNull(helper.getNameResolverArgs(), "nameResolverArgs"); + } + + @Override + void start() { + URI uri; + try { + uri = new URI("dns", "", "/" + dnsHostName, null); + } catch (URISyntaxException e) { + status = Status.INTERNAL.withDescription( + "Bug, invalid URI creation: " + dnsHostName).withCause(e); + handleEndpointResolutionError(); + return; + } + resolver = nameResolverFactory.newNameResolver(uri, nameResolverArgs); + if (resolver == null) { + status = Status.INTERNAL.withDescription("Xds cluster resolver lb for logical DNS " + + "cluster [" + name + "] cannot find DNS resolver with uri:" + uri); + handleEndpointResolutionError(); + return; + } + resolver.start(new LogicalDnsClusterState.NameResolverListener(dnsHostName)); + } + + void refresh() { + if (resolver == null) { + return; + } + cancelBackoff(); + resolver.refresh(); + } + + @Override + void shutdown() { + super.shutdown(); + if (resolver != null) { + resolver.shutdown(); + } + cancelBackoff(); + } + + private void cancelBackoff() { + if (scheduledRefresh != null) { + scheduledRefresh.cancel(); + scheduledRefresh = null; + backoffPolicy = null; + } + } + + private class DelayedNameResolverRefresh implements Runnable { + @Override + public void run() { + scheduledRefresh = null; + if (!shutdown) { + resolver.refresh(); + } + } + } + + private class NameResolverListener extends NameResolver.Listener2 { + private final String dnsHostName; + + NameResolverListener(String dnsHostName) { + this.dnsHostName = dnsHostName; + } + + @Override + public void onResult(final NameResolver.ResolutionResult resolutionResult) { + class NameResolved implements Runnable { + @Override + public void run() { + if (shutdown) { + return; + } + backoffPolicy = null; // reset backoff sequence if succeeded + // Arbitrary priority notation for all DNS-resolved endpoints. + String priorityName = priorityName(name, 0); // value doesn't matter + List addresses = new ArrayList<>(); + for (EquivalentAddressGroup eag : resolutionResult.getAddresses()) { + // No weight attribute is attached, all endpoint-level LB policy should be able + // to handle such it. + String localityName = localityName(XdsNameResolver.LOGICAL_DNS_CLUSTER_LOCALITY); + Attributes attr = eag.getAttributes().toBuilder() + .set(XdsAttributes.ATTR_LOCALITY, XdsNameResolver.LOGICAL_DNS_CLUSTER_LOCALITY) + .set(XdsAttributes.ATTR_LOCALITY_NAME, localityName) + .set(XdsAttributes.ATTR_ADDRESS_NAME, dnsHostName) + .build(); + eag = new EquivalentAddressGroup(eag.getAddresses(), attr); + eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName)); + addresses.add(eag); + } + PriorityChildConfig priorityChildConfig = + generateDnsBasedPriorityChildConfig( + name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, + lbRegistry, Collections.emptyList()); + status = Status.OK; + resolved = true; + result = new ClusterResolutionResult(addresses, priorityName, priorityChildConfig); + handleEndpointResourceUpdate(); + } + } + + syncContext.execute(new NameResolved()); + } + + @Override + public void onError(final Status error) { + syncContext.execute(new Runnable() { + @Override + public void run() { + if (shutdown) { + return; + } + status = error; + // NameResolver.Listener API cannot distinguish between address-not-found and + // transient errors. If the error occurs in the first resolution, treat it as + // address not found. Otherwise, either there is previously resolved addresses + // previously encountered error, propagate the error to downstream/upstream and + // let downstream/upstream handle it. + if (!resolved) { + resolved = true; + handleEndpointResourceUpdate(); + } else { + handleEndpointResolutionError(); + } + if (scheduledRefresh != null && scheduledRefresh.isPending()) { + return; + } + if (backoffPolicy == null) { + backoffPolicy = backoffPolicyProvider.get(); + } + long delayNanos = backoffPolicy.nextBackoffNanos(); + logger.log(XdsLogLevel.DEBUG, + "Logical DNS resolver for cluster {0} encountered name resolution " + + "error: {1}, scheduling DNS resolution backoff for {2} ns", + name, error, delayNanos); + scheduledRefresh = + syncContext.schedule( + new LogicalDnsClusterState.DelayedNameResolverRefresh(), delayNanos, + TimeUnit.NANOSECONDS, helper.getScheduledExecutorService()); + } + }); + } + } + } + } + + static class ClusterResolutionResult { + // Endpoint addresses. + private final List addresses; + // Config (include load balancing policy/config) for each priority in the cluster. + private final Map priorityChildConfigs; + // List of priority names ordered in descending priorities. + private final List priorities; + + ClusterResolutionResult(List addresses, String priority, + PriorityChildConfig config) { + this(addresses, Collections.singletonMap(priority, config), + Collections.singletonList(priority)); + } + + ClusterResolutionResult(List addresses, + Map configs, List priorities) { + this.addresses = addresses; + this.priorityChildConfigs = configs; + this.priorities = priorities; + } + } + + static final class ClusterResolverConfig { + // Ordered list of clusters to be resolved. + final List discoveryMechanisms; + // GracefulSwitch configuration + final Object lbConfig; + + ClusterResolverConfig(List discoveryMechanisms, Object lbConfig) { + this.discoveryMechanisms = checkNotNull(discoveryMechanisms, "discoveryMechanisms"); + this.lbConfig = checkNotNull(lbConfig, "lbConfig"); + } + + @Override + public int hashCode() { + return Objects.hash(discoveryMechanisms, lbConfig); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ClusterResolverConfig that = (ClusterResolverConfig) o; + return discoveryMechanisms.equals(that.discoveryMechanisms) + && lbConfig.equals(that.lbConfig); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("discoveryMechanisms", discoveryMechanisms) + .add("lbConfig", lbConfig) + .toString(); + } + + // Describes the mechanism for a specific cluster. + static final class DiscoveryMechanism { + // Name of the cluster to resolve. + final String cluster; + // Type of the cluster. + final Type type; + // Load reporting server info. Null if not enabled. + @Nullable + final Bootstrapper.ServerInfo lrsServerInfo; + // Cluster-level max concurrent request threshold. Null if not specified. + @Nullable + final Long maxConcurrentRequests; + // TLS context for connections to endpoints in the cluster. + @Nullable + final EnvoyServerProtoData.UpstreamTlsContext tlsContext; + // Resource name for resolving endpoints via EDS. Only valid for EDS clusters. + @Nullable + final String edsServiceName; + // Hostname for resolving endpoints via DNS. Only valid for LOGICAL_DNS clusters. + @Nullable + final String dnsHostName; + @Nullable + final EnvoyServerProtoData.OutlierDetection outlierDetection; + final Map filterMetadata; + final StatusOr endpointConfig; + + enum Type { + EDS, + LOGICAL_DNS, + } + + private DiscoveryMechanism( + String cluster, Type type, @Nullable String edsServiceName, + @Nullable String dnsHostName, @Nullable Bootstrapper.ServerInfo lrsServerInfo, + @Nullable Long maxConcurrentRequests, + @Nullable EnvoyServerProtoData.UpstreamTlsContext tlsContext, + Map filterMetadata, + @Nullable EnvoyServerProtoData.OutlierDetection outlierDetection, + @Nullable StatusOr endpointConfig) { + this.cluster = checkNotNull(cluster, "cluster"); + this.type = checkNotNull(type, "type"); + this.edsServiceName = edsServiceName; + this.dnsHostName = dnsHostName; + this.lrsServerInfo = lrsServerInfo; + this.maxConcurrentRequests = maxConcurrentRequests; + this.tlsContext = tlsContext; + this.filterMetadata = ImmutableMap.copyOf(checkNotNull(filterMetadata, "filterMetadata")); + this.outlierDetection = outlierDetection; + this.endpointConfig = endpointConfig; + } + + static DiscoveryMechanism forEds( + String cluster, + @Nullable String edsServiceName, + @Nullable Bootstrapper.ServerInfo lrsServerInfo, + @Nullable Long maxConcurrentRequests, + @Nullable EnvoyServerProtoData.UpstreamTlsContext tlsContext, + Map filterMetadata, + EnvoyServerProtoData.OutlierDetection outlierDetection, + StatusOr endpointConfig) { + return new DiscoveryMechanism(cluster, Type.EDS, edsServiceName, null, lrsServerInfo, + maxConcurrentRequests, tlsContext, filterMetadata, outlierDetection, endpointConfig); + } + + static DiscoveryMechanism forLogicalDns( + String cluster, String dnsHostName, + @Nullable Bootstrapper.ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, + @Nullable EnvoyServerProtoData.UpstreamTlsContext tlsContext, + Map filterMetadata) { + return new DiscoveryMechanism(cluster, Type.LOGICAL_DNS, null, dnsHostName, + lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, null, null); + } + + @Override + public int hashCode() { + return Objects.hash(cluster, type, lrsServerInfo, maxConcurrentRequests, tlsContext, + edsServiceName, dnsHostName, filterMetadata, outlierDetection); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DiscoveryMechanism that = (DiscoveryMechanism) o; + return cluster.equals(that.cluster) + && type == that.type + && Objects.equals(edsServiceName, that.edsServiceName) + && Objects.equals(dnsHostName, that.dnsHostName) + && Objects.equals(lrsServerInfo, that.lrsServerInfo) + && Objects.equals(maxConcurrentRequests, that.maxConcurrentRequests) + && Objects.equals(tlsContext, that.tlsContext) + && Objects.equals(filterMetadata, that.filterMetadata) + && Objects.equals(outlierDetection, that.outlierDetection); + } + + @Override + public String toString() { + MoreObjects.ToStringHelper toStringHelper = + MoreObjects.toStringHelper(this) + .add("cluster", cluster) + .add("type", type) + .add("edsServiceName", edsServiceName) + .add("dnsHostName", dnsHostName) + .add("lrsServerInfo", lrsServerInfo) + // Exclude tlsContext as its string representation is cumbersome. + .add("maxConcurrentRequests", maxConcurrentRequests) + .add("filterMetadata", filterMetadata) + // Exclude outlierDetection as its string representation is long. + ; + return toStringHelper.toString(); + } } } @@ -123,14 +1021,15 @@ public void shutdown() { * The state of a CDS working session of {@link CdsLoadBalancer2}. Created and started when * receiving the CDS LB policy config with the top-level cluster name. */ - private final class CdsLbState { + final class CdsLbState { - private final ClusterState root; - private final Map clusterStates = new ConcurrentHashMap<>(); + private final ClusterStateDetails root; + private final Map clusterStates = new ConcurrentHashMap<>(); private LoadBalancer childLb; - private CdsLbState(String rootCluster) { - root = new ClusterState(rootCluster); + private CdsLbState(String rootCluster, StatusOr rootClusterConfig) { + root = new ClusterStateDetails(rootCluster, rootClusterConfig); + clusterStates.put(rootCluster, root); } private void start() { @@ -144,24 +1043,66 @@ private void shutdown() { } } + void update(XdsConfig xdsConfig) { + List addedClusters = new ArrayList<>(); + List existingClusters = new ArrayList<>(); + + // TODO Need to get the root cluster name + for (Map.Entry> entry + : xdsConfig.getClusters().entrySet()) { + String clusterName = entry.getKey(); + StatusOr configOr = entry.getValue(); + ClusterStateDetails clusterState = clusterStates.get(clusterName); + if (clusterState == null) { + clusterState = new ClusterStateDetails(clusterName, configOr); + addedClusters.add(clusterName); + clusterStates.put(clusterName, clusterState); + } else { + existingClusters.add(clusterName); + } + } + + // Cleanup removed ones + List removedClusters = new ArrayList<>(); + for (String clusterName : clusterStates.keySet()) { + if (!addedClusters.contains(clusterName) && !existingClusters.contains(clusterName)) { + clusterStates.get(clusterName).shutdown(); + removedClusters.add(clusterName); + } + } + removedClusters.forEach(clusterStates::remove); + + for (StatusOr clusterConfigOr: xdsConfig.getClusters().values()) { + if (!clusterConfigOr.hasValue()) { + continue; + } + XdsConfig.XdsClusterConfig clusterConfig = clusterConfigOr.getValue(); + String clusterName = clusterConfig.getClusterName(); + ClusterStateDetails clusterState = clusterStates.get(clusterName); + if (clusterState.shutdown) { + clusterState.shutdown = false; + } + clusterState.update(clusterConfig.getClusterResource(), clusterConfig.getEndpoint()); + } + + handleClusterDiscovered(); + } + private void handleClusterDiscovered() { List instances = new ArrayList<>(); // Used for loop detection to break the infinite recursion that loops would cause - Map> parentClusters = new HashMap<>(); + Map> parentClusters = new HashMap<>(); Status loopStatus = null; // Level-order traversal. // Collect configurations for all non-aggregate (leaf) clusters. - Queue queue = new ArrayDeque<>(); + Queue queue = new ArrayDeque<>(); queue.add(root); while (!queue.isEmpty()) { int size = queue.size(); for (int i = 0; i < size; i++) { - ClusterState clusterState = queue.remove(); - if (!clusterState.discovered) { - return; // do not proceed until all clusters discovered - } + ClusterStateDetails clusterState = queue.remove(); if (clusterState.result == null) { // resource revoked or not exists continue; } @@ -175,7 +1116,8 @@ private void handleClusterDiscovered() { clusterState.result.maxConcurrentRequests(), clusterState.result.upstreamTlsContext(), clusterState.result.filterMetadata(), - clusterState.result.outlierDetection()); + clusterState.result.outlierDetection(), + clusterState.getEndpointConfigStatusOr()); } else { // logical DNS instance = DiscoveryMechanism.forLogicalDns( clusterState.name, clusterState.result.dnsHostName(), @@ -206,9 +1148,8 @@ private void handleClusterDiscovered() { } loopStatus = Status.UNAVAILABLE.withDescription(String.format( "CDS error: circular aggregate clusters directly under %s for " - + "root cluster %s, named %s, xDS node ID: %s", - clusterState.name, root.name, namesCausingLoops, - xdsClient.getBootstrapInfo().node().getId())); + + "root cluster %s, named %s", + clusterState.name, root.name, namesCausingLoops)); } } } @@ -226,8 +1167,7 @@ private void handleClusterDiscovered() { childLb = null; } Status unavailable = Status.UNAVAILABLE.withDescription(String.format( - "CDS error: found 0 leaf (logical DNS or EDS) clusters for root cluster %s" - + " xDS node ID: %s", root.name, xdsClient.getBootstrapInfo().node().getId())); + "CDS error: found 0 leaf (logical DNS or EDS) clusters for root cluster " + root.name)); helper.updateBalancingState( TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(unavailable))); return; @@ -245,24 +1185,29 @@ private void handleClusterDiscovered() { ClusterResolverConfig config = new ClusterResolverConfig( Collections.unmodifiableList(instances), configOrError.getConfig()); if (childLb == null) { - childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper); + logger.log(XdsLogLevel.DEBUG, "Config: {0}", config); + childLb = new GracefulSwitchLoadBalancer(helper); } + Object gracefulConfig = + GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( + new ClusterResolverLbStateFactory(), config); childLb.handleResolvedAddresses( - resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(config).build()); + resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(gracefulConfig).build()); } /** * Returns children that would cause loops and builds up the parentClusters map. **/ - private List identifyLoops(ClusterState clusterState, - Map> parentClusters) { + private List identifyLoops( + ClusterStateDetails clusterState, + Map> parentClusters) { Set ancestors = new HashSet<>(); ancestors.add(clusterState.name); addAncestors(ancestors, clusterState, parentClusters); List namesCausingLoops = new ArrayList<>(); - for (ClusterState state : clusterState.childClusterStates.values()) { + for (ClusterStateDetails state : clusterState.childClusterStates.values()) { if (ancestors.contains(state.name)) { namesCausingLoops.add(state.name); } @@ -279,103 +1224,80 @@ private List identifyLoops(ClusterState clusterState, } /** Recursively add all parents to the ancestors list. **/ - private void addAncestors(Set ancestors, ClusterState clusterState, - Map> parentClusters) { - List directParents = parentClusters.get(clusterState); + private void addAncestors(Set ancestors, ClusterStateDetails clusterState, + Map> parentClusters) { + List directParents = parentClusters.get(clusterState); if (directParents != null) { directParents.stream().map(c -> c.name).forEach(ancestors::add); directParents.forEach(p -> addAncestors(ancestors, p, parentClusters)); } } - private void handleClusterDiscoveryError(Status error) { - String description = error.getDescription() == null ? "" : error.getDescription() + " "; - Status errorWithNodeId = error.withDescription( - description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId()); - if (childLb != null) { - childLb.handleNameResolutionError(errorWithNodeId); - } else { - helper.updateBalancingState( - TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(errorWithNodeId))); - } - } - - private final class ClusterState implements ResourceWatcher { + private final class ClusterStateDetails { private final String name; @Nullable - private Map childClusterStates; + private Map childClusterStates; @Nullable private CdsUpdate result; // Following fields are effectively final. private boolean isLeaf; - private boolean discovered; private boolean shutdown; + private EdsUpdate endpointConfig; + private Status error; - private ClusterState(String name) { + private ClusterStateDetails(String name, StatusOr configOr) { this.name = name; + if (configOr.hasValue()) { + XdsConfig.XdsClusterConfig config = configOr.getValue(); + this.result = config.getClusterResource(); + this.isLeaf = result.clusterType() != ClusterType.AGGREGATE; + if (config.getEndpoint() != null) { + if (config.getEndpoint().hasValue()) { + endpointConfig = config.getEndpoint().getValue(); + } else { + this.error = config.getEndpoint().getStatus(); + this.result = null; + } + } + } else { + this.error = configOr.getStatus(); + } + } + + StatusOr getEndpointConfigStatusOr() { + return (error == null) ? StatusOr.fromValue(endpointConfig) : StatusOr.fromStatus(error); } private void start() { shutdown = false; - xdsClient.watchXdsResource(XdsClusterResource.getInstance(), name, this, syncContext); + if (error != null) { + return; + } + update(result, StatusOr.fromValue(endpointConfig)); } void shutdown() { shutdown = true; - xdsClient.cancelXdsResourceWatch(XdsClusterResource.getInstance(), name, this); if (childClusterStates != null) { // recursively shut down all descendants childClusterStates.values().stream() .filter(state -> !state.shutdown) - .forEach(ClusterState::shutdown); - } - } - - @Override - public void onError(Status error) { - Status status = Status.UNAVAILABLE - .withDescription( - String.format("Unable to load CDS %s. xDS server returned: %s: %s", - name, error.getCode(), error.getDescription())) - .withCause(error.getCause()); - if (shutdown) { - return; - } - // All watchers should receive the same error, so we only propagate it once. - if (ClusterState.this == root) { - handleClusterDiscoveryError(status); - } - } - - @Override - public void onResourceDoesNotExist(String resourceName) { - if (shutdown) { - return; - } - discovered = true; - result = null; - if (childClusterStates != null) { - for (ClusterState state : childClusterStates.values()) { - state.shutdown(); - } - childClusterStates = null; + .forEach(ClusterStateDetails::shutdown); } - handleClusterDiscovered(); } - @Override - public void onChanged(final CdsUpdate update) { + private void update(final CdsUpdate update, StatusOr endpointConfig) { if (shutdown) { return; } logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update); - discovered = true; result = update; if (update.clusterType() == ClusterType.AGGREGATE) { isLeaf = false; logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}", update.clusterName(), update.prioritizedClusterNames()); - Map newChildStates = new LinkedHashMap<>(); + Map newChildStates = new LinkedHashMap<>(); + for (String cluster : update.prioritizedClusterNames()) { if (newChildStates.containsKey(cluster)) { logger.log(XdsLogLevel.WARNING, @@ -384,39 +1306,40 @@ public void onChanged(final CdsUpdate update) { continue; } if (childClusterStates == null || !childClusterStates.containsKey(cluster)) { - ClusterState childState; - if (clusterStates.containsKey(cluster)) { - childState = clusterStates.get(cluster); - if (childState.shutdown) { - childState.start(); - } - } else { - childState = new ClusterState(cluster); - clusterStates.put(cluster, childState); - childState.start(); + ClusterStateDetails childState = clusterStates.get(cluster); + if (childState.shutdown) { + childState.shutdown = false; } newChildStates.put(cluster, childState); } else { newChildStates.put(cluster, childClusterStates.remove(cluster)); } } + if (childClusterStates != null) { // stop subscribing to revoked child clusters - for (ClusterState watcher : childClusterStates.values()) { - watcher.shutdown(); + for (ClusterStateDetails oldChildState : childClusterStates.values()) { + if (!newChildStates.containsKey(oldChildState.name)) { + oldChildState.shutdown(); + } } } childClusterStates = newChildStates; } else if (update.clusterType() == ClusterType.EDS) { isLeaf = true; - logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}", - update.clusterName(), update.edsServiceName()); + assert endpointConfig != null; + if (endpointConfig.getStatus() != null) { + logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}, error: {2}", + update.clusterName(), update.edsServiceName(), endpointConfig.getStatus()); + } else { + logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}", + update.clusterName(), update.edsServiceName()); + this.endpointConfig = endpointConfig.getValue(); + } } else { // logical DNS isLeaf = true; logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName()); } - handleClusterDiscovered(); } - } } } diff --git a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java index 4e08ddc5973..276cd459e5c 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java @@ -16,65 +16,20 @@ package io.grpc.xds; -import static com.google.common.base.Preconditions.checkNotNull; -import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; -import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME; - import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; -import com.google.protobuf.Struct; -import io.grpc.Attributes; -import io.grpc.EquivalentAddressGroup; import io.grpc.InternalLogId; import io.grpc.LoadBalancer; -import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerRegistry; -import io.grpc.NameResolver; -import io.grpc.NameResolver.ResolutionResult; import io.grpc.Status; -import io.grpc.SynchronizationContext; -import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.internal.BackoffPolicy; import io.grpc.internal.ExponentialBackoffPolicy; import io.grpc.internal.ObjectPool; -import io.grpc.util.ForwardingLoadBalancerHelper; import io.grpc.util.GracefulSwitchLoadBalancer; -import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig; -import io.grpc.xds.ClusterImplLoadBalancerProvider.ClusterImplConfig; -import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig; -import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism; -import io.grpc.xds.Endpoints.DropOverload; -import io.grpc.xds.Endpoints.LbEndpoint; -import io.grpc.xds.Endpoints.LocalityLbEndpoints; -import io.grpc.xds.EnvoyServerProtoData.FailurePercentageEjection; -import io.grpc.xds.EnvoyServerProtoData.OutlierDetection; -import io.grpc.xds.EnvoyServerProtoData.SuccessRateEjection; -import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext; -import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig; -import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig; -import io.grpc.xds.XdsEndpointResource.EdsUpdate; -import io.grpc.xds.client.Bootstrapper.ServerInfo; -import io.grpc.xds.client.Locality; +import io.grpc.xds.CdsLoadBalancer2.ClusterResolverConfig; import io.grpc.xds.client.XdsClient; -import io.grpc.xds.client.XdsClient.ResourceWatcher; import io.grpc.xds.client.XdsLogger; import io.grpc.xds.client.XdsLogger.XdsLogLevel; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Locale; -import java.util.Map; import java.util.Objects; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; /** * Load balancer for cluster_resolver_experimental LB policy. This LB policy is the child LB policy @@ -84,14 +39,7 @@ * used in the downstream LB policies for fine-grained load balancing purposes. */ final class ClusterResolverLoadBalancer extends LoadBalancer { - // DNS-resolved endpoints do not have the definition of the locality it belongs to, just hardcode - // to an empty locality. - private static final Locality LOGICAL_DNS_CLUSTER_LOCALITY = Locality.create("", "", ""); private final XdsLogger logger; - private final SynchronizationContext syncContext; - private final ScheduledExecutorService timeService; - private final LoadBalancerRegistry lbRegistry; - private final BackoffPolicy.Provider backoffPolicyProvider; private final GracefulSwitchLoadBalancer delegate; private ObjectPool xdsClientPool; private XdsClient xdsClient; @@ -105,10 +53,6 @@ final class ClusterResolverLoadBalancer extends LoadBalancer { @VisibleForTesting ClusterResolverLoadBalancer(Helper helper, LoadBalancerRegistry lbRegistry, BackoffPolicy.Provider backoffPolicyProvider) { - this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry"); - this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); - this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext"); - this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService"); delegate = new GracefulSwitchLoadBalancer(helper); logger = XdsLogger.withLogId( InternalLogId.allocate("cluster-resolver-lb", helper.getAuthority())); @@ -128,7 +72,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { logger.log(XdsLogLevel.DEBUG, "Config: {0}", config); this.config = config; Object gracefulConfig = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( - new ClusterResolverLbStateFactory(), config); + null, config); // TODO intentionally broken as class should go away delegate.handleResolvedAddresses( resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(gracefulConfig).build()); } @@ -150,724 +94,4 @@ public void shutdown() { } } - private final class ClusterResolverLbStateFactory extends LoadBalancer.Factory { - @Override - public LoadBalancer newLoadBalancer(Helper helper) { - return new ClusterResolverLbState(helper); - } - } - - /** - * The state of a cluster_resolver LB working session. A new instance is created whenever - * the cluster_resolver LB receives a new config. The old instance is replaced when the - * new one is ready to handle new RPCs. - */ - private final class ClusterResolverLbState extends LoadBalancer { - private final Helper helper; - private final List clusters = new ArrayList<>(); - private final Map clusterStates = new HashMap<>(); - private Object endpointLbConfig; - private ResolvedAddresses resolvedAddresses; - private LoadBalancer childLb; - - ClusterResolverLbState(Helper helper) { - this.helper = new RefreshableHelper(checkNotNull(helper, "helper")); - logger.log(XdsLogLevel.DEBUG, "New ClusterResolverLbState"); - } - - @Override - public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { - this.resolvedAddresses = resolvedAddresses; - ClusterResolverConfig config = - (ClusterResolverConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); - endpointLbConfig = config.lbConfig; - for (DiscoveryMechanism instance : config.discoveryMechanisms) { - clusters.add(instance.cluster); - ClusterState state; - if (instance.type == DiscoveryMechanism.Type.EDS) { - state = new EdsClusterState(instance.cluster, instance.edsServiceName, - instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext, - instance.filterMetadata, instance.outlierDetection); - } else { // logical DNS - state = new LogicalDnsClusterState(instance.cluster, instance.dnsHostName, - instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext, - instance.filterMetadata); - } - clusterStates.put(instance.cluster, state); - state.start(); - } - return Status.OK; - } - - @Override - public void handleNameResolutionError(Status error) { - if (childLb != null) { - childLb.handleNameResolutionError(error); - } else { - helper.updateBalancingState( - TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error))); - } - } - - @Override - public void shutdown() { - for (ClusterState state : clusterStates.values()) { - state.shutdown(); - } - if (childLb != null) { - childLb.shutdown(); - } - } - - private void handleEndpointResourceUpdate() { - List addresses = new ArrayList<>(); - Map priorityChildConfigs = new HashMap<>(); - List priorities = new ArrayList<>(); // totally ordered priority list - - Status endpointNotFound = Status.OK; - for (String cluster : clusters) { - ClusterState state = clusterStates.get(cluster); - // Propagate endpoints to the child LB policy only after all clusters have been resolved. - if (!state.resolved && state.status.isOk()) { - return; - } - if (state.result != null) { - addresses.addAll(state.result.addresses); - priorityChildConfigs.putAll(state.result.priorityChildConfigs); - priorities.addAll(state.result.priorities); - } else { - endpointNotFound = state.status; - } - } - if (addresses.isEmpty()) { - if (endpointNotFound.isOk()) { - endpointNotFound = Status.UNAVAILABLE.withDescription( - "No usable endpoint from cluster(s): " + clusters); - } else { - endpointNotFound = - Status.UNAVAILABLE.withCause(endpointNotFound.getCause()) - .withDescription(endpointNotFound.getDescription()); - } - helper.updateBalancingState( - TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(endpointNotFound))); - if (childLb != null) { - childLb.shutdown(); - childLb = null; - } - return; - } - PriorityLbConfig childConfig = - new PriorityLbConfig(Collections.unmodifiableMap(priorityChildConfigs), - Collections.unmodifiableList(priorities)); - if (childLb == null) { - childLb = lbRegistry.getProvider(PRIORITY_POLICY_NAME).newLoadBalancer(helper); - } - childLb.handleResolvedAddresses( - resolvedAddresses.toBuilder() - .setLoadBalancingPolicyConfig(childConfig) - .setAddresses(Collections.unmodifiableList(addresses)) - .build()); - } - - private void handleEndpointResolutionError() { - boolean allInError = true; - Status error = null; - for (String cluster : clusters) { - ClusterState state = clusterStates.get(cluster); - if (state.status.isOk()) { - allInError = false; - } else { - error = state.status; - } - } - if (allInError) { - if (childLb != null) { - childLb.handleNameResolutionError(error); - } else { - helper.updateBalancingState( - TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error))); - } - } - } - - /** - * Wires re-resolution requests from downstream LB policies with DNS resolver. - */ - private final class RefreshableHelper extends ForwardingLoadBalancerHelper { - private final Helper delegate; - - private RefreshableHelper(Helper delegate) { - this.delegate = checkNotNull(delegate, "delegate"); - } - - @Override - public void refreshNameResolution() { - for (ClusterState state : clusterStates.values()) { - if (state instanceof LogicalDnsClusterState) { - ((LogicalDnsClusterState) state).refresh(); - } - } - } - - @Override - protected Helper delegate() { - return delegate; - } - } - - /** - * Resolution state of an underlying cluster. - */ - private abstract class ClusterState { - // Name of the cluster to be resolved. - protected final String name; - @Nullable - protected final ServerInfo lrsServerInfo; - @Nullable - protected final Long maxConcurrentRequests; - @Nullable - protected final UpstreamTlsContext tlsContext; - protected final Map filterMetadata; - @Nullable - protected final OutlierDetection outlierDetection; - // Resolution status, may contain most recent error encountered. - protected Status status = Status.OK; - // True if has received resolution result. - protected boolean resolved; - // Most recently resolved addresses and config, or null if resource not exists. - @Nullable - protected ClusterResolutionResult result; - - protected boolean shutdown; - - private ClusterState(String name, @Nullable ServerInfo lrsServerInfo, - @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, - Map filterMetadata, @Nullable OutlierDetection outlierDetection) { - this.name = name; - this.lrsServerInfo = lrsServerInfo; - this.maxConcurrentRequests = maxConcurrentRequests; - this.tlsContext = tlsContext; - this.filterMetadata = ImmutableMap.copyOf(filterMetadata); - this.outlierDetection = outlierDetection; - } - - abstract void start(); - - void shutdown() { - shutdown = true; - } - } - - private final class EdsClusterState extends ClusterState implements ResourceWatcher { - @Nullable - private final String edsServiceName; - private Map localityPriorityNames = Collections.emptyMap(); - int priorityNameGenId = 1; - - private EdsClusterState(String name, @Nullable String edsServiceName, - @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, - @Nullable UpstreamTlsContext tlsContext, Map filterMetadata, - @Nullable OutlierDetection outlierDetection) { - super(name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, - outlierDetection); - this.edsServiceName = edsServiceName; - } - - @Override - void start() { - String resourceName = edsServiceName != null ? edsServiceName : name; - logger.log(XdsLogLevel.INFO, "Start watching EDS resource {0}", resourceName); - xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), - resourceName, this, syncContext); - } - - @Override - protected void shutdown() { - super.shutdown(); - String resourceName = edsServiceName != null ? edsServiceName : name; - logger.log(XdsLogLevel.INFO, "Stop watching EDS resource {0}", resourceName); - xdsClient.cancelXdsResourceWatch(XdsEndpointResource.getInstance(), resourceName, this); - } - - @Override - public void onChanged(final EdsUpdate update) { - class EndpointsUpdated implements Runnable { - @Override - public void run() { - if (shutdown) { - return; - } - logger.log(XdsLogLevel.DEBUG, "Received endpoint update {0}", update); - if (logger.isLoggable(XdsLogLevel.INFO)) { - logger.log(XdsLogLevel.INFO, "Cluster {0}: {1} localities, {2} drop categories", - update.clusterName, update.localityLbEndpointsMap.size(), - update.dropPolicies.size()); - } - Map localityLbEndpoints = - update.localityLbEndpointsMap; - List dropOverloads = update.dropPolicies; - List addresses = new ArrayList<>(); - Map> prioritizedLocalityWeights = new HashMap<>(); - List sortedPriorityNames = generatePriorityNames(name, localityLbEndpoints); - for (Locality locality : localityLbEndpoints.keySet()) { - LocalityLbEndpoints localityLbInfo = localityLbEndpoints.get(locality); - String priorityName = localityPriorityNames.get(locality); - boolean discard = true; - for (LbEndpoint endpoint : localityLbInfo.endpoints()) { - if (endpoint.isHealthy()) { - discard = false; - long weight = localityLbInfo.localityWeight(); - if (endpoint.loadBalancingWeight() != 0) { - weight *= endpoint.loadBalancingWeight(); - } - String localityName = localityName(locality); - Attributes attr = - endpoint.eag().getAttributes().toBuilder() - .set(XdsAttributes.ATTR_LOCALITY, locality) - .set(XdsAttributes.ATTR_LOCALITY_NAME, localityName) - .set(XdsAttributes.ATTR_LOCALITY_WEIGHT, - localityLbInfo.localityWeight()) - .set(XdsAttributes.ATTR_SERVER_WEIGHT, weight) - .set(XdsAttributes.ATTR_ADDRESS_NAME, endpoint.hostname()) - .build(); - EquivalentAddressGroup eag = new EquivalentAddressGroup( - endpoint.eag().getAddresses(), attr); - eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName)); - addresses.add(eag); - } - } - if (discard) { - logger.log(XdsLogLevel.INFO, - "Discard locality {0} with 0 healthy endpoints", locality); - continue; - } - if (!prioritizedLocalityWeights.containsKey(priorityName)) { - prioritizedLocalityWeights.put(priorityName, new HashMap()); - } - prioritizedLocalityWeights.get(priorityName).put( - locality, localityLbInfo.localityWeight()); - } - if (prioritizedLocalityWeights.isEmpty()) { - // Will still update the result, as if the cluster resource is revoked. - logger.log(XdsLogLevel.INFO, - "Cluster {0} has no usable priority/locality/endpoint", update.clusterName); - } - sortedPriorityNames.retainAll(prioritizedLocalityWeights.keySet()); - Map priorityChildConfigs = - generateEdsBasedPriorityChildConfigs( - name, edsServiceName, lrsServerInfo, maxConcurrentRequests, tlsContext, - filterMetadata, outlierDetection, endpointLbConfig, lbRegistry, - prioritizedLocalityWeights, dropOverloads); - status = Status.OK; - resolved = true; - result = new ClusterResolutionResult(addresses, priorityChildConfigs, - sortedPriorityNames); - handleEndpointResourceUpdate(); - } - } - - new EndpointsUpdated().run(); - } - - private List generatePriorityNames(String name, - Map localityLbEndpoints) { - TreeMap> todo = new TreeMap<>(); - for (Locality locality : localityLbEndpoints.keySet()) { - int priority = localityLbEndpoints.get(locality).priority(); - if (!todo.containsKey(priority)) { - todo.put(priority, new ArrayList<>()); - } - todo.get(priority).add(locality); - } - Map newNames = new HashMap<>(); - Set usedNames = new HashSet<>(); - List ret = new ArrayList<>(); - for (Integer priority: todo.keySet()) { - String foundName = ""; - for (Locality locality : todo.get(priority)) { - if (localityPriorityNames.containsKey(locality) - && usedNames.add(localityPriorityNames.get(locality))) { - foundName = localityPriorityNames.get(locality); - break; - } - } - if ("".equals(foundName)) { - foundName = String.format(Locale.US, "%s[child%d]", name, priorityNameGenId++); - } - for (Locality locality : todo.get(priority)) { - newNames.put(locality, foundName); - } - ret.add(foundName); - } - localityPriorityNames = newNames; - return ret; - } - - @Override - public void onResourceDoesNotExist(final String resourceName) { - if (shutdown) { - return; - } - logger.log(XdsLogLevel.INFO, "Resource {0} unavailable", resourceName); - status = Status.OK; - resolved = true; - result = null; // resource revoked - handleEndpointResourceUpdate(); - } - - @Override - public void onError(final Status error) { - if (shutdown) { - return; - } - String resourceName = edsServiceName != null ? edsServiceName : name; - status = Status.UNAVAILABLE - .withDescription(String.format("Unable to load EDS %s. xDS server returned: %s: %s", - resourceName, error.getCode(), error.getDescription())) - .withCause(error.getCause()); - logger.log(XdsLogLevel.WARNING, "Received EDS error: {0}", error); - handleEndpointResolutionError(); - } - } - - private final class LogicalDnsClusterState extends ClusterState { - private final String dnsHostName; - private final NameResolver.Factory nameResolverFactory; - private final NameResolver.Args nameResolverArgs; - private NameResolver resolver; - @Nullable - private BackoffPolicy backoffPolicy; - @Nullable - private ScheduledHandle scheduledRefresh; - - private LogicalDnsClusterState(String name, String dnsHostName, - @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, - @Nullable UpstreamTlsContext tlsContext, Map filterMetadata) { - super(name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, null); - this.dnsHostName = checkNotNull(dnsHostName, "dnsHostName"); - nameResolverFactory = - checkNotNull(helper.getNameResolverRegistry().asFactory(), "nameResolverFactory"); - nameResolverArgs = checkNotNull(helper.getNameResolverArgs(), "nameResolverArgs"); - } - - @Override - void start() { - URI uri; - try { - uri = new URI("dns", "", "/" + dnsHostName, null); - } catch (URISyntaxException e) { - status = Status.INTERNAL.withDescription( - "Bug, invalid URI creation: " + dnsHostName).withCause(e); - handleEndpointResolutionError(); - return; - } - resolver = nameResolverFactory.newNameResolver(uri, nameResolverArgs); - if (resolver == null) { - status = Status.INTERNAL.withDescription("Xds cluster resolver lb for logical DNS " - + "cluster [" + name + "] cannot find DNS resolver with uri:" + uri); - handleEndpointResolutionError(); - return; - } - resolver.start(new NameResolverListener(dnsHostName)); - } - - void refresh() { - if (resolver == null) { - return; - } - cancelBackoff(); - resolver.refresh(); - } - - @Override - void shutdown() { - super.shutdown(); - if (resolver != null) { - resolver.shutdown(); - } - cancelBackoff(); - } - - private void cancelBackoff() { - if (scheduledRefresh != null) { - scheduledRefresh.cancel(); - scheduledRefresh = null; - backoffPolicy = null; - } - } - - private class DelayedNameResolverRefresh implements Runnable { - @Override - public void run() { - scheduledRefresh = null; - if (!shutdown) { - resolver.refresh(); - } - } - } - - private class NameResolverListener extends NameResolver.Listener2 { - private final String dnsHostName; - - NameResolverListener(String dnsHostName) { - this.dnsHostName = dnsHostName; - } - - @Override - public void onResult(final ResolutionResult resolutionResult) { - class NameResolved implements Runnable { - @Override - public void run() { - if (shutdown) { - return; - } - backoffPolicy = null; // reset backoff sequence if succeeded - // Arbitrary priority notation for all DNS-resolved endpoints. - String priorityName = priorityName(name, 0); // value doesn't matter - List addresses = new ArrayList<>(); - for (EquivalentAddressGroup eag : resolutionResult.getAddresses()) { - // No weight attribute is attached, all endpoint-level LB policy should be able - // to handle such it. - String localityName = localityName(LOGICAL_DNS_CLUSTER_LOCALITY); - Attributes attr = eag.getAttributes().toBuilder() - .set(XdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY) - .set(XdsAttributes.ATTR_LOCALITY_NAME, localityName) - .set(XdsAttributes.ATTR_ADDRESS_NAME, dnsHostName) - .build(); - eag = new EquivalentAddressGroup(eag.getAddresses(), attr); - eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName)); - addresses.add(eag); - } - PriorityChildConfig priorityChildConfig = generateDnsBasedPriorityChildConfig( - name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, - lbRegistry, Collections.emptyList()); - status = Status.OK; - resolved = true; - result = new ClusterResolutionResult(addresses, priorityName, priorityChildConfig); - handleEndpointResourceUpdate(); - } - } - - syncContext.execute(new NameResolved()); - } - - @Override - public void onError(final Status error) { - syncContext.execute(new Runnable() { - @Override - public void run() { - if (shutdown) { - return; - } - status = error; - // NameResolver.Listener API cannot distinguish between address-not-found and - // transient errors. If the error occurs in the first resolution, treat it as - // address not found. Otherwise, either there is previously resolved addresses - // previously encountered error, propagate the error to downstream/upstream and - // let downstream/upstream handle it. - if (!resolved) { - resolved = true; - handleEndpointResourceUpdate(); - } else { - handleEndpointResolutionError(); - } - if (scheduledRefresh != null && scheduledRefresh.isPending()) { - return; - } - if (backoffPolicy == null) { - backoffPolicy = backoffPolicyProvider.get(); - } - long delayNanos = backoffPolicy.nextBackoffNanos(); - logger.log(XdsLogLevel.DEBUG, - "Logical DNS resolver for cluster {0} encountered name resolution " - + "error: {1}, scheduling DNS resolution backoff for {2} ns", - name, error, delayNanos); - scheduledRefresh = - syncContext.schedule( - new DelayedNameResolverRefresh(), delayNanos, TimeUnit.NANOSECONDS, - timeService); - } - }); - } - } - } - } - - private static class ClusterResolutionResult { - // Endpoint addresses. - private final List addresses; - // Config (include load balancing policy/config) for each priority in the cluster. - private final Map priorityChildConfigs; - // List of priority names ordered in descending priorities. - private final List priorities; - - ClusterResolutionResult(List addresses, String priority, - PriorityChildConfig config) { - this(addresses, Collections.singletonMap(priority, config), - Collections.singletonList(priority)); - } - - ClusterResolutionResult(List addresses, - Map configs, List priorities) { - this.addresses = addresses; - this.priorityChildConfigs = configs; - this.priorities = priorities; - } - } - - /** - * Generates the config to be used in the priority LB policy for the single priority of - * logical DNS cluster. - * - *

priority LB -> cluster_impl LB (single hardcoded priority) -> pick_first - */ - private static PriorityChildConfig generateDnsBasedPriorityChildConfig( - String cluster, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, - @Nullable UpstreamTlsContext tlsContext, Map filterMetadata, - LoadBalancerRegistry lbRegistry, List dropOverloads) { - // Override endpoint-level LB policy with pick_first for logical DNS cluster. - Object endpointLbConfig = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( - lbRegistry.getProvider("pick_first"), null); - ClusterImplConfig clusterImplConfig = - new ClusterImplConfig(cluster, null, lrsServerInfo, maxConcurrentRequests, - dropOverloads, endpointLbConfig, tlsContext, filterMetadata); - LoadBalancerProvider clusterImplLbProvider = - lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME); - Object clusterImplPolicy = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( - clusterImplLbProvider, clusterImplConfig); - return new PriorityChildConfig(clusterImplPolicy, false /* ignoreReresolution*/); - } - - /** - * Generates configs to be used in the priority LB policy for priorities in an EDS cluster. - * - *

priority LB -> cluster_impl LB (one per priority) -> (weighted_target LB - * -> round_robin / least_request_experimental (one per locality)) / ring_hash_experimental - */ - private static Map generateEdsBasedPriorityChildConfigs( - String cluster, @Nullable String edsServiceName, @Nullable ServerInfo lrsServerInfo, - @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, - Map filterMetadata, - @Nullable OutlierDetection outlierDetection, Object endpointLbConfig, - LoadBalancerRegistry lbRegistry, Map> prioritizedLocalityWeights, List dropOverloads) { - Map configs = new HashMap<>(); - for (String priority : prioritizedLocalityWeights.keySet()) { - ClusterImplConfig clusterImplConfig = - new ClusterImplConfig(cluster, edsServiceName, lrsServerInfo, maxConcurrentRequests, - dropOverloads, endpointLbConfig, tlsContext, filterMetadata); - LoadBalancerProvider clusterImplLbProvider = - lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME); - Object priorityChildPolicy = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( - clusterImplLbProvider, clusterImplConfig); - - // If outlier detection has been configured we wrap the child policy in the outlier detection - // load balancer. - if (outlierDetection != null) { - LoadBalancerProvider outlierDetectionProvider = lbRegistry.getProvider( - "outlier_detection_experimental"); - priorityChildPolicy = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( - outlierDetectionProvider, - buildOutlierDetectionLbConfig(outlierDetection, priorityChildPolicy)); - } - - PriorityChildConfig priorityChildConfig = - new PriorityChildConfig(priorityChildPolicy, true /* ignoreReresolution */); - configs.put(priority, priorityChildConfig); - } - return configs; - } - - /** - * Converts {@link OutlierDetection} that represents the xDS configuration to {@link - * OutlierDetectionLoadBalancerConfig} that the {@link io.grpc.util.OutlierDetectionLoadBalancer} - * understands. - */ - private static OutlierDetectionLoadBalancerConfig buildOutlierDetectionLbConfig( - OutlierDetection outlierDetection, Object childConfig) { - OutlierDetectionLoadBalancerConfig.Builder configBuilder - = new OutlierDetectionLoadBalancerConfig.Builder(); - - configBuilder.setChildConfig(childConfig); - - if (outlierDetection.intervalNanos() != null) { - configBuilder.setIntervalNanos(outlierDetection.intervalNanos()); - } - if (outlierDetection.baseEjectionTimeNanos() != null) { - configBuilder.setBaseEjectionTimeNanos(outlierDetection.baseEjectionTimeNanos()); - } - if (outlierDetection.maxEjectionTimeNanos() != null) { - configBuilder.setMaxEjectionTimeNanos(outlierDetection.maxEjectionTimeNanos()); - } - if (outlierDetection.maxEjectionPercent() != null) { - configBuilder.setMaxEjectionPercent(outlierDetection.maxEjectionPercent()); - } - - SuccessRateEjection successRate = outlierDetection.successRateEjection(); - if (successRate != null) { - OutlierDetectionLoadBalancerConfig.SuccessRateEjection.Builder - successRateConfigBuilder = new OutlierDetectionLoadBalancerConfig - .SuccessRateEjection.Builder(); - - if (successRate.stdevFactor() != null) { - successRateConfigBuilder.setStdevFactor(successRate.stdevFactor()); - } - if (successRate.enforcementPercentage() != null) { - successRateConfigBuilder.setEnforcementPercentage(successRate.enforcementPercentage()); - } - if (successRate.minimumHosts() != null) { - successRateConfigBuilder.setMinimumHosts(successRate.minimumHosts()); - } - if (successRate.requestVolume() != null) { - successRateConfigBuilder.setRequestVolume(successRate.requestVolume()); - } - - configBuilder.setSuccessRateEjection(successRateConfigBuilder.build()); - } - - FailurePercentageEjection failurePercentage = outlierDetection.failurePercentageEjection(); - if (failurePercentage != null) { - OutlierDetectionLoadBalancerConfig.FailurePercentageEjection.Builder - failurePercentageConfigBuilder = new OutlierDetectionLoadBalancerConfig - .FailurePercentageEjection.Builder(); - - if (failurePercentage.threshold() != null) { - failurePercentageConfigBuilder.setThreshold(failurePercentage.threshold()); - } - if (failurePercentage.enforcementPercentage() != null) { - failurePercentageConfigBuilder.setEnforcementPercentage( - failurePercentage.enforcementPercentage()); - } - if (failurePercentage.minimumHosts() != null) { - failurePercentageConfigBuilder.setMinimumHosts(failurePercentage.minimumHosts()); - } - if (failurePercentage.requestVolume() != null) { - failurePercentageConfigBuilder.setRequestVolume(failurePercentage.requestVolume()); - } - - configBuilder.setFailurePercentageEjection(failurePercentageConfigBuilder.build()); - } - - return configBuilder.build(); - } - - /** - * Generates a string that represents the priority in the LB policy config. The string is unique - * across priorities in all clusters and priorityName(c, p1) < priorityName(c, p2) iff p1 < p2. - * The ordering is undefined for priorities in different clusters. - */ - private static String priorityName(String cluster, int priority) { - return cluster + "[child" + priority + "]"; - } - - /** - * Generates a string that represents the locality in the LB policy config. The string is unique - * across all localities in all clusters. - */ - private static String localityName(Locality locality) { - return "{region=\"" + locality.region() - + "\", zone=\"" + locality.zone() - + "\", sub_zone=\"" + locality.subZone() - + "\"}"; - } } diff --git a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java index 2301cb670e0..030e4a4f573 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java +++ b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java @@ -16,24 +16,13 @@ package io.grpc.xds; -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.base.MoreObjects; -import com.google.common.collect.ImmutableMap; -import com.google.protobuf.Struct; import io.grpc.Internal; import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancerProvider; import io.grpc.NameResolver.ConfigOrError; import io.grpc.Status; -import io.grpc.xds.EnvoyServerProtoData.OutlierDetection; -import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext; -import io.grpc.xds.client.Bootstrapper.ServerInfo; -import java.util.List; import java.util.Map; -import java.util.Objects; -import javax.annotation.Nullable; /** * The provider for the cluster_resolver load balancing policy. This class should not be directly @@ -69,145 +58,4 @@ public LoadBalancer newLoadBalancer(Helper helper) { return new ClusterResolverLoadBalancer(helper); } - static final class ClusterResolverConfig { - // Ordered list of clusters to be resolved. - final List discoveryMechanisms; - // GracefulSwitch configuration - final Object lbConfig; - - ClusterResolverConfig(List discoveryMechanisms, Object lbConfig) { - this.discoveryMechanisms = checkNotNull(discoveryMechanisms, "discoveryMechanisms"); - this.lbConfig = checkNotNull(lbConfig, "lbConfig"); - } - - @Override - public int hashCode() { - return Objects.hash(discoveryMechanisms, lbConfig); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - ClusterResolverConfig that = (ClusterResolverConfig) o; - return discoveryMechanisms.equals(that.discoveryMechanisms) - && lbConfig.equals(that.lbConfig); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("discoveryMechanisms", discoveryMechanisms) - .add("lbConfig", lbConfig) - .toString(); - } - - // Describes the mechanism for a specific cluster. - static final class DiscoveryMechanism { - // Name of the cluster to resolve. - final String cluster; - // Type of the cluster. - final Type type; - // Load reporting server info. Null if not enabled. - @Nullable - final ServerInfo lrsServerInfo; - // Cluster-level max concurrent request threshold. Null if not specified. - @Nullable - final Long maxConcurrentRequests; - // TLS context for connections to endpoints in the cluster. - @Nullable - final UpstreamTlsContext tlsContext; - // Resource name for resolving endpoints via EDS. Only valid for EDS clusters. - @Nullable - final String edsServiceName; - // Hostname for resolving endpoints via DNS. Only valid for LOGICAL_DNS clusters. - @Nullable - final String dnsHostName; - @Nullable - final OutlierDetection outlierDetection; - final Map filterMetadata; - - enum Type { - EDS, - LOGICAL_DNS, - } - - private DiscoveryMechanism(String cluster, Type type, @Nullable String edsServiceName, - @Nullable String dnsHostName, @Nullable ServerInfo lrsServerInfo, - @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, - Map filterMetadata, @Nullable OutlierDetection outlierDetection) { - this.cluster = checkNotNull(cluster, "cluster"); - this.type = checkNotNull(type, "type"); - this.edsServiceName = edsServiceName; - this.dnsHostName = dnsHostName; - this.lrsServerInfo = lrsServerInfo; - this.maxConcurrentRequests = maxConcurrentRequests; - this.tlsContext = tlsContext; - this.filterMetadata = ImmutableMap.copyOf(checkNotNull(filterMetadata, "filterMetadata")); - this.outlierDetection = outlierDetection; - } - - static DiscoveryMechanism forEds(String cluster, @Nullable String edsServiceName, - @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, - @Nullable UpstreamTlsContext tlsContext, Map filterMetadata, - OutlierDetection outlierDetection) { - return new DiscoveryMechanism(cluster, Type.EDS, edsServiceName, null, lrsServerInfo, - maxConcurrentRequests, tlsContext, filterMetadata, outlierDetection); - } - - static DiscoveryMechanism forLogicalDns(String cluster, String dnsHostName, - @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, - @Nullable UpstreamTlsContext tlsContext, Map filterMetadata) { - return new DiscoveryMechanism(cluster, Type.LOGICAL_DNS, null, dnsHostName, - lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, null); - } - - @Override - public int hashCode() { - return Objects.hash(cluster, type, lrsServerInfo, maxConcurrentRequests, tlsContext, - edsServiceName, dnsHostName, filterMetadata, outlierDetection); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - DiscoveryMechanism that = (DiscoveryMechanism) o; - return cluster.equals(that.cluster) - && type == that.type - && Objects.equals(edsServiceName, that.edsServiceName) - && Objects.equals(dnsHostName, that.dnsHostName) - && Objects.equals(lrsServerInfo, that.lrsServerInfo) - && Objects.equals(maxConcurrentRequests, that.maxConcurrentRequests) - && Objects.equals(tlsContext, that.tlsContext) - && Objects.equals(filterMetadata, that.filterMetadata) - && Objects.equals(outlierDetection, that.outlierDetection); - } - - @Override - public String toString() { - MoreObjects.ToStringHelper toStringHelper = - MoreObjects.toStringHelper(this) - .add("cluster", cluster) - .add("type", type) - .add("edsServiceName", edsServiceName) - .add("dnsHostName", dnsHostName) - .add("lrsServerInfo", lrsServerInfo) - // Exclude tlsContext as its string representation is cumbersome. - .add("maxConcurrentRequests", maxConcurrentRequests) - .add("filterMetadata", filterMetadata) - // Exclude outlierDetection as its string representation is long. - ; - return toStringHelper.toString(); - } - } - } } diff --git a/xds/src/main/java/io/grpc/xds/XdsAttributes.java b/xds/src/main/java/io/grpc/xds/XdsAttributes.java index af8139d8ff4..7dd74fa6802 100644 --- a/xds/src/main/java/io/grpc/xds/XdsAttributes.java +++ b/xds/src/main/java/io/grpc/xds/XdsAttributes.java @@ -36,6 +36,13 @@ final class XdsAttributes { static final Attributes.Key> XDS_CLIENT_POOL = Attributes.Key.create("io.grpc.xds.XdsAttributes.xdsClientPool"); + /** + * Attribute key for passing around the XdsClient object pool across NameResolver/LoadBalancers. + */ + @NameResolver.ResolutionResultAttr + static final Attributes.Key XDS_CONFIG = + Attributes.Key.create("io.grpc.xds.XdsAttributes.xdsConfig"); + /** * Attribute key for obtaining the global provider that provides atomics for aggregating * outstanding RPCs sent to each cluster. diff --git a/xds/src/main/java/io/grpc/xds/XdsConfig.java b/xds/src/main/java/io/grpc/xds/XdsConfig.java index 999ee0d4b0c..f2f5ffcea27 100644 --- a/xds/src/main/java/io/grpc/xds/XdsConfig.java +++ b/xds/src/main/java/io/grpc/xds/XdsConfig.java @@ -181,7 +181,6 @@ XdsConfigBuilder setVirtualHost(VirtualHost virtualHost) { XdsConfig build() { checkNotNull(listener, "listener"); - checkNotNull(route, "route"); return new XdsConfig(listener, route, clusters, virtualHost); } } diff --git a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java index e0339b6b99a..fe8c4f14c27 100644 --- a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java +++ b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java @@ -252,6 +252,18 @@ private void maybePublishConfig() { return; } + // Ensure that LDS is valid. If not, onError or onResourceDoesNotExist should have been called + TypeWatchers ldsWatcher = resourceWatchers.get(XdsListenerResource.getInstance()); + TypeWatchers rdsWatcher = resourceWatchers.get(XdsRouteConfigureResource.getInstance()); + if (ldsWatcher == null || ldsWatcher.watchers.isEmpty()) { + return; + } + XdsWatcherBase lds = ldsWatcher.watchers.values().iterator().next(); + if (lds == null || !lds.hasDataValue()) { + return; + } + + lastXdsConfig = buildConfig(); xdsConfigWatcher.onUpdate(lastXdsConfig); } @@ -267,7 +279,7 @@ private XdsConfig buildConfig() { resourceWatchers.get(XdsListenerResource.getInstance()).watchers.values()) { XdsListenerResource.LdsUpdate ldsUpdate = ((LdsWatcher) xdsWatcherBase).getData().getValue(); builder.setListener(ldsUpdate); - if (activeVirtualHost == null) { + if (activeVirtualHost == null && ldsUpdate.httpConnectionManager().virtualHosts() != null) { activeVirtualHost = RoutingUtils.findVirtualHostForHostName( ldsUpdate.httpConnectionManager().virtualHosts(), dataPlaneAuthority); } @@ -279,16 +291,23 @@ private XdsConfig buildConfig() { } } - resourceWatchers.get(XdsRouteConfigureResource.getInstance()).watchers.values().stream() - .map(watcher -> (RdsWatcher) watcher) - .forEach(watcher -> builder.setRoute(watcher.getData().getValue())); + if (resourceWatchers.get(XdsRouteConfigureResource.getInstance()) != null) { + resourceWatchers.get(XdsRouteConfigureResource.getInstance()).watchers.values().stream() + .map(watcher -> (RdsWatcher) watcher) + .forEach(watcher -> builder.setRoute(watcher.getData().getValue())); + } - builder.setVirtualHost(activeVirtualHost); + if (activeVirtualHost != null) { + builder.setVirtualHost(activeVirtualHost); + } + TypeWatchers edsTypeWatchers = resourceWatchers.get(ENDPOINT_RESOURCE); Map> edsWatchers = - resourceWatchers.get(ENDPOINT_RESOURCE).watchers; + (edsTypeWatchers != null) ? edsTypeWatchers.watchers : Collections.EMPTY_MAP; + + TypeWatchers cdsTypeWatchers = resourceWatchers.get(CLUSTER_RESOURCE); Map> cdsWatchers = - resourceWatchers.get(CLUSTER_RESOURCE).watchers; + (cdsTypeWatchers != null) ? cdsTypeWatchers.watchers : Collections.EMPTY_MAP; // Iterate CDS watchers for (XdsWatcherBase watcher : cdsWatchers.values()) { @@ -462,6 +481,7 @@ public void onError(Status error) { @Override public void onResourceDoesNotExist(String resourceName) { handleDoesNotExist(resourceName); + cleanUpRdsWatcher(); xdsConfigWatcher.onResourceDoesNotExist(toContextString()); } diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java index 3c7f4455fde..1a54530280c 100644 --- a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java +++ b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java @@ -60,11 +60,10 @@ import io.grpc.xds.VirtualHost.Route.RouteAction.HashPolicy; import io.grpc.xds.VirtualHost.Route.RouteAction.RetryPolicy; import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider; -import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate; import io.grpc.xds.client.Bootstrapper.AuthorityInfo; import io.grpc.xds.client.Bootstrapper.BootstrapInfo; +import io.grpc.xds.client.Locality; import io.grpc.xds.client.XdsClient; -import io.grpc.xds.client.XdsClient.ResourceWatcher; import io.grpc.xds.client.XdsLogger; import io.grpc.xds.client.XdsLogger.XdsLogLevel; import java.net.URI; @@ -99,6 +98,9 @@ final class XdsNameResolver extends NameResolver { CallOptions.Key.create("io.grpc.xds.RPC_HASH_KEY"); static final CallOptions.Key AUTO_HOST_REWRITE_KEY = CallOptions.Key.create("io.grpc.xds.AUTO_HOST_REWRITE_KEY"); + // DNS-resolved endpoints do not have the definition of the locality it belongs to, just hardcode + // to an empty locality. + static final Locality LOGICAL_DNS_CLUSTER_LOCALITY = Locality.create("", "", ""); @VisibleForTesting static boolean enableTimeout = Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT")) @@ -133,7 +135,7 @@ final class XdsNameResolver extends NameResolver { private ObjectPool xdsClientPool; private XdsClient xdsClient; private CallCounterProvider callCounterProvider; - private ResolveState resolveState; + private ResolveState2 resolveState2; // Workaround for https://github.com/grpc/grpc-java/issues/8886 . This should be handled in // XdsClient instead of here. private boolean receivedConfig; @@ -224,9 +226,8 @@ public void start(Listener2 listener) { } ldsResourceName = XdsClient.canonifyResourceName(ldsResourceName); callCounterProvider = SharedCallCounterMap.getInstance(); - resolveState = new ResolveState(ldsResourceName); - resolveState.start(); + resolveState2 = new ResolveState2(ldsResourceName); // auto starts } private static String expandPercentS(String template, String replacement) { @@ -236,8 +237,8 @@ private static String expandPercentS(String template, String replacement) { @Override public void shutdown() { logger.log(XdsLogLevel.INFO, "Shutdown"); - if (resolveState != null) { - resolveState.stop(); + if (resolveState2 != null) { + resolveState2.shutdown(); } if (xdsClient != null) { xdsClient = xdsClientPool.returnObject(xdsClient); @@ -308,6 +309,7 @@ private void updateResolutionResult() { Attributes attrs = Attributes.newBuilder() .set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool) + .set(XdsAttributes.XDS_CONFIG, resolveState2.lastConfig) .set(XdsAttributes.CALL_COUNTER_PROVIDER, callCounterProvider) .set(InternalConfigSelector.KEY, configSelector) .build(); @@ -638,82 +640,69 @@ public Result selectConfig(PickSubchannelArgs args) { } } - private class ResolveState implements ResourceWatcher { + class ResolveState2 implements XdsDependencyManager.XdsConfigWatcher { private final ConfigOrError emptyServiceConfig = serviceConfigParser.parseServiceConfig(Collections.emptyMap()); - private final String ldsResourceName; private boolean stopped; + private final XdsDependencyManager xdsDependencyManager; @Nullable private Set existingClusters; // clusters to which new requests can be routed - @Nullable - private RouteDiscoveryState routeDiscoveryState; + private XdsConfig lastConfig; + private final String authority; - ResolveState(String ldsResourceName) { - this.ldsResourceName = ldsResourceName; + private ResolveState2(String ldsResourceName) { + authority = overrideAuthority != null ? overrideAuthority : encodedServiceAuthority; + xdsDependencyManager = + new XdsDependencyManager(xdsClient, this, syncContext, authority, ldsResourceName ); } - @Override - public void onChanged(final XdsListenerResource.LdsUpdate update) { + private void shutdown() { if (stopped) { return; } - logger.log(XdsLogLevel.INFO, "Receive LDS resource update: {0}", update); - HttpConnectionManager httpConnectionManager = update.httpConnectionManager(); - List virtualHosts = httpConnectionManager.virtualHosts(); - String rdsName = httpConnectionManager.rdsName(); - cleanUpRouteDiscoveryState(); - if (virtualHosts != null) { - updateRoutes(virtualHosts, httpConnectionManager.httpMaxStreamDurationNano(), - httpConnectionManager.httpFilterConfigs()); - } else { - routeDiscoveryState = new RouteDiscoveryState( - rdsName, httpConnectionManager.httpMaxStreamDurationNano(), - httpConnectionManager.httpFilterConfigs()); - logger.log(XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName); - xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), - rdsName, routeDiscoveryState, syncContext); + + stopped = true; + xdsDependencyManager.shutdown(); + } + + @Override + public void onUpdate(XdsConfig update) { + if (stopped) { + return; } + logger.log(XdsLogLevel.INFO, "Receive XDS resource update: {0}", update); + lastConfig = update; + + // Process Route + HttpConnectionManager httpConnectionManager = update.getListener().httpConnectionManager(); + VirtualHost virtualHost = update.getVirtualHost(); // httpConnectionManager.virtualHosts(); + + updateRoutes(virtualHost, httpConnectionManager.httpMaxStreamDurationNano(), + httpConnectionManager.httpFilterConfigs()); } @Override - public void onError(final Status error) { + public void onError(String resourceContext, Status error) { if (stopped || receivedConfig) { return; } listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription( - String.format("Unable to load LDS %s. xDS server returned: %s: %s", - ldsResourceName, error.getCode(), error.getDescription()))); + String.format("Unable to load %s. xDS server returned: %s: %s", + resourceContext, error.getCode(), error.getDescription()))); } @Override - public void onResourceDoesNotExist(final String resourceName) { + public void onResourceDoesNotExist(String resourceContext) { if (stopped) { return; } - String error = "LDS resource does not exist: " + resourceName; + String error = "Resource does not exist: " + resourceContext; logger.log(XdsLogLevel.INFO, error); - cleanUpRouteDiscoveryState(); cleanUpRoutes(error); } - private void start() { - logger.log(XdsLogLevel.INFO, "Start watching LDS resource {0}", ldsResourceName); - xdsClient.watchXdsResource(XdsListenerResource.getInstance(), - ldsResourceName, this, syncContext); - } - - private void stop() { - logger.log(XdsLogLevel.INFO, "Stop watching LDS resource {0}", ldsResourceName); - stopped = true; - cleanUpRouteDiscoveryState(); - xdsClient.cancelXdsResourceWatch(XdsListenerResource.getInstance(), ldsResourceName, this); - } - - // called in syncContext - private void updateRoutes(List virtualHosts, long httpMaxStreamDurationNano, - @Nullable List filterConfigs) { - String authority = overrideAuthority != null ? overrideAuthority : encodedServiceAuthority; - VirtualHost virtualHost = RoutingUtils.findVirtualHostForHostName(virtualHosts, authority); + private void updateRoutes(@Nullable VirtualHost virtualHost, long httpMaxStreamDurationNano, + @Nullable List filterConfigs) { if (virtualHost == null) { String error = "Failed to find virtual host matching hostname: " + authority; logger.log(XdsLogLevel.WARNING, error); @@ -721,7 +710,7 @@ private void updateRoutes(List virtualHosts, long httpMaxStreamDura return; } - List routes = virtualHost.routes(); + List routes = virtualHost.routes() ; // Populate all clusters to which requests can be routed to through the virtual host. Set clusters = new HashSet<>(); @@ -761,7 +750,7 @@ private void updateRoutes(List virtualHosts, long httpMaxStreamDura existingClusters == null ? clusters : Sets.difference(clusters, existingClusters); Set deletedClusters = existingClusters == null - ? Collections.emptySet() : Sets.difference(existingClusters, clusters); + ? Collections.emptySet() : Sets.difference(existingClusters, clusters); existingClusters = clusters; for (String cluster : addedClusters) { if (clusterRefs.containsKey(cluster)) { @@ -796,10 +785,10 @@ private void updateRoutes(List virtualHosts, long httpMaxStreamDura } // Make newly added clusters selectable by config selector and deleted clusters no longer // selectable. - routingConfig = - new RoutingConfig( - httpMaxStreamDurationNano, routes, filterConfigs, - virtualHost.filterConfigOverrides()); + ImmutableMap filterConfigOverrides = + (virtualHost != null) ? virtualHost.filterConfigOverrides() : ImmutableMap.of(); + routingConfig = new RoutingConfig( + httpMaxStreamDurationNano, routes, filterConfigs, filterConfigOverrides); shouldUpdateResult = false; for (String cluster : deletedClusters) { int count = clusterRefs.get(cluster).refCount.decrementAndGet(); @@ -832,70 +821,13 @@ private void cleanUpRoutes(String error) { error + ", xDS node ID: " + xdsClient.getBootstrapInfo().node().getId(); listener.onResult(ResolutionResult.newBuilder() .setAttributes(Attributes.newBuilder() - .set(InternalConfigSelector.KEY, - new FailingConfigSelector(Status.UNAVAILABLE.withDescription(errorWithNodeId))) - .build()) + .set(InternalConfigSelector.KEY, + new FailingConfigSelector(Status.UNAVAILABLE.withDescription(errorWithNodeId))) + .build()) .setServiceConfig(emptyServiceConfig) .build()); receivedConfig = true; } - - private void cleanUpRouteDiscoveryState() { - if (routeDiscoveryState != null) { - String rdsName = routeDiscoveryState.resourceName; - logger.log(XdsLogLevel.INFO, "Stop watching RDS resource {0}", rdsName); - xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(), rdsName, - routeDiscoveryState); - routeDiscoveryState = null; - } - } - - /** - * Discovery state for RouteConfiguration resource. One instance for each Listener resource - * update. - */ - private class RouteDiscoveryState implements ResourceWatcher { - private final String resourceName; - private final long httpMaxStreamDurationNano; - @Nullable - private final List filterConfigs; - - private RouteDiscoveryState(String resourceName, long httpMaxStreamDurationNano, - @Nullable List filterConfigs) { - this.resourceName = resourceName; - this.httpMaxStreamDurationNano = httpMaxStreamDurationNano; - this.filterConfigs = filterConfigs; - } - - @Override - public void onChanged(final RdsUpdate update) { - if (RouteDiscoveryState.this != routeDiscoveryState) { - return; - } - logger.log(XdsLogLevel.INFO, "Received RDS resource update: {0}", update); - updateRoutes(update.virtualHosts, httpMaxStreamDurationNano, filterConfigs); - } - - @Override - public void onError(final Status error) { - if (RouteDiscoveryState.this != routeDiscoveryState || receivedConfig) { - return; - } - listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription( - String.format("Unable to load RDS %s. xDS server returned: %s: %s", - resourceName, error.getCode(), error.getDescription()))); - } - - @Override - public void onResourceDoesNotExist(final String resourceName) { - if (RouteDiscoveryState.this != routeDiscoveryState) { - return; - } - String error = "RDS resource does not exist: " + resourceName; - logger.log(XdsLogLevel.INFO, error); - cleanUpRoutes(error); - } - } } /** diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java index 82a61e79abf..8cdd32ed535 100644 --- a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java +++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java @@ -47,11 +47,12 @@ import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.SynchronizationContext; +import io.grpc.internal.ExponentialBackoffPolicy; import io.grpc.internal.ObjectPool; import io.grpc.util.GracefulSwitchLoadBalancerAccessor; +import io.grpc.xds.CdsLoadBalancer2.ClusterResolverConfig; +import io.grpc.xds.CdsLoadBalancer2.ClusterResolverConfig.DiscoveryMechanism; import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig; -import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig; -import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism; import io.grpc.xds.EnvoyServerProtoData.OutlierDetection; import io.grpc.xds.EnvoyServerProtoData.SuccessRateEjection; import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext; @@ -153,7 +154,8 @@ public void setUp() { new FakeLoadBalancerProvider("ring_hash_experimental", new RingHashLoadBalancerProvider())); lbRegistry.register(new FakeLoadBalancerProvider("least_request_experimental", new LeastRequestLoadBalancerProvider())); - loadBalancer = new CdsLoadBalancer2(helper, lbRegistry); + loadBalancer = + new CdsLoadBalancer2(helper, lbRegistry, new ExponentialBackoffPolicy.Provider()); loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(Collections.emptyList()) diff --git a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java index 9243abba6d3..8f260d1b2d4 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java @@ -62,9 +62,9 @@ import io.grpc.util.GracefulSwitchLoadBalancer; import io.grpc.util.GracefulSwitchLoadBalancerAccessor; import io.grpc.util.OutlierDetectionLoadBalancerProvider; +import io.grpc.xds.CdsLoadBalancer2.ClusterResolverConfig; +import io.grpc.xds.CdsLoadBalancer2.ClusterResolverConfig.DiscoveryMechanism; import io.grpc.xds.ClusterImplLoadBalancerProvider.ClusterImplConfig; -import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig; -import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism; import io.grpc.xds.Endpoints.DropOverload; import io.grpc.xds.Endpoints.LbEndpoint; import io.grpc.xds.Endpoints.LocalityLbEndpoints; @@ -137,13 +137,13 @@ public class ClusterResolverLoadBalancerTest { FailurePercentageEjection.create(100, 100, 100, 100)); private final DiscoveryMechanism edsDiscoveryMechanism1 = DiscoveryMechanism.forEds(CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_INFO, 100L, tlsContext, - Collections.emptyMap(), null); + Collections.emptyMap(), null, null); private final DiscoveryMechanism edsDiscoveryMechanism2 = DiscoveryMechanism.forEds(CLUSTER2, EDS_SERVICE_NAME2, LRS_SERVER_INFO, 200L, tlsContext, - Collections.emptyMap(), null); + Collections.emptyMap(), null, null); private final DiscoveryMechanism edsDiscoveryMechanismWithOutlierDetection = DiscoveryMechanism.forEds(CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_INFO, 100L, tlsContext, - Collections.emptyMap(), outlierDetection); + Collections.emptyMap(), outlierDetection, null); private final DiscoveryMechanism logicalDnsDiscoveryMechanism = DiscoveryMechanism.forLogicalDns(CLUSTER_DNS, DNS_HOST_NAME, LRS_SERVER_INFO, 300L, null, Collections.emptyMap()); diff --git a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java index d895cecdb10..3a1d04a1214 100644 --- a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java @@ -26,6 +26,7 @@ import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; @@ -86,6 +87,8 @@ import io.grpc.xds.VirtualHost.Route.RouteAction.RetryPolicy; import io.grpc.xds.VirtualHost.Route.RouteMatch; import io.grpc.xds.VirtualHost.Route.RouteMatch.PathMatcher; +import io.grpc.xds.XdsClusterResource.CdsUpdate; +import io.grpc.xds.XdsEndpointResource.EdsUpdate; import io.grpc.xds.XdsListenerResource.LdsUpdate; import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate; import io.grpc.xds.client.Bootstrapper.AuthorityInfo; @@ -101,14 +104,17 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import javax.annotation.Nullable; import org.junit.After; import org.junit.Before; @@ -385,6 +391,7 @@ public void resolving_ldsResourceUpdateRdsName() { Collections.singletonList(route1), ImmutableMap.of()); xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, Collections.singletonList(virtualHost)); + createAndDeliverClusterUpdates(xdsClient, cluster1); verify(mockListener).onResult(resolutionResultCaptor.capture()); assertServiceConfigForLoadBalancingConfig( Collections.singletonList(cluster1), @@ -439,6 +446,7 @@ public void resolving_ldsResourceRevokedAndAddedBack() { Collections.singletonList(route), ImmutableMap.of()); xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, Collections.singletonList(virtualHost)); + createAndDeliverClusterUpdates(xdsClient, cluster1); verify(mockListener).onResult(resolutionResultCaptor.capture()); assertServiceConfigForLoadBalancingConfig( Collections.singletonList(cluster1), @@ -504,7 +512,7 @@ public void resolving_encounterErrorLdsWatcherOnly() { verify(mockListener).onError(errorCaptor.capture()); Status error = errorCaptor.getValue(); assertThat(error.getCode()).isEqualTo(Code.UNAVAILABLE); - assertThat(error.getDescription()).isEqualTo("Unable to load LDS " + AUTHORITY + assertThat(error.getDescription()).isEqualTo("Unable to load LDS resource: " + AUTHORITY + ". xDS server returned: UNAVAILABLE: server unreachable"); } @@ -516,7 +524,7 @@ public void resolving_translateErrorLds() { verify(mockListener).onError(errorCaptor.capture()); Status error = errorCaptor.getValue(); assertThat(error.getCode()).isEqualTo(Code.UNAVAILABLE); - assertThat(error.getDescription()).isEqualTo("Unable to load LDS " + AUTHORITY + assertThat(error.getDescription()).isEqualTo("Unable to load LDS resource: " + AUTHORITY + ". xDS server returned: NOT_FOUND: server unreachable"); assertThat(error.getCause()).isNull(); } @@ -530,11 +538,11 @@ public void resolving_encounterErrorLdsAndRdsWatchers() { verify(mockListener, times(2)).onError(errorCaptor.capture()); Status error = errorCaptor.getAllValues().get(0); assertThat(error.getCode()).isEqualTo(Code.UNAVAILABLE); - assertThat(error.getDescription()).isEqualTo("Unable to load LDS " + AUTHORITY + assertThat(error.getDescription()).isEqualTo("Unable to load LDS resource: " + AUTHORITY + ". xDS server returned: UNAVAILABLE: server unreachable"); error = errorCaptor.getAllValues().get(1); assertThat(error.getCode()).isEqualTo(Code.UNAVAILABLE); - assertThat(error.getDescription()).isEqualTo("Unable to load RDS " + RDS_RESOURCE_NAME + assertThat(error.getDescription()).isEqualTo("Unable to load RDS resource: " + RDS_RESOURCE_NAME + ". xDS server returned: UNAVAILABLE: server unreachable"); } @@ -557,6 +565,7 @@ public void resolving_matchingVirtualHostNotFound_matchingOverrideAuthority() { resolver.start(mockListener); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); xdsClient.deliverLdsUpdate(0L, Arrays.asList(virtualHost)); + createAndDeliverClusterUpdates(xdsClient, cluster1); verify(mockListener).onResult(resolutionResultCaptor.capture()); assertServiceConfigForLoadBalancingConfig( Collections.singletonList(cluster1), @@ -580,7 +589,8 @@ public void resolving_matchingVirtualHostNotFound_notMatchingOverrideAuthority() metricRecorder); resolver.start(mockListener); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); - xdsClient.deliverLdsUpdate(0L, Arrays.asList(virtualHost)); + xdsClient.deliverLdsUpdateOnly(0L, Arrays.asList(virtualHost)); + fakeClock.forwardTime(15, TimeUnit.SECONDS); assertEmptyResolutionResult("random"); } @@ -1162,6 +1172,18 @@ public void resolved_simpleCallSucceeds_routeToWeightedCluster() { assertCallSelectClusterResult(call1, configSelector, cluster1, 20.0); } + /** Creates and delivers both CDS and EDS updates for the given clusters. */ + private static void createAndDeliverClusterUpdates(FakeXdsClient xdsClient, String... clusterNames) { + for (String clusterName : clusterNames) { + CdsUpdate.Builder forEds = CdsUpdate.forEds(clusterName, clusterName, null, null, null, null) + .roundRobinLbPolicy(); + xdsClient.deliverCdsUpdate(clusterName, forEds.build()); + EdsUpdate edsUpdate = new EdsUpdate(clusterName, + XdsTestUtils.createMinimalLbEndpointsMap("host"), Collections.emptyList()); + xdsClient.deliverEdsUpdate(clusterName, edsUpdate); + } + } + @Test public void resolved_simpleCallSucceeds_routeToRls() { when(mockRandom.nextInt(anyInt())).thenReturn(90, 10); @@ -1417,6 +1439,7 @@ public void generateServiceConfig_forClusterManagerLoadBalancingConfig() throws ImmutableList.of(route1, route2, route3), ImmutableMap.of()); xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, Collections.singletonList(virtualHost)); + createAndDeliverClusterUpdates(xdsClient, "cluster-foo", "cluster-bar", "cluster-baz"); verify(mockListener).onResult(resolutionResultCaptor.capture()); String expectedServiceConfigJson = @@ -2041,6 +2064,8 @@ private class FakeXdsClient extends XdsClient { private String rdsResource; private ResourceWatcher ldsWatcher; private ResourceWatcher rdsWatcher; + private final Map>> cdsWatchers = new HashMap<>(); + private final Map>> edsWatchers = new HashMap<>(); @Override public BootstrapInfo getBootstrapInfo() { @@ -2068,6 +2093,14 @@ public void watchXdsResource(XdsResourceType resou rdsResource = resourceName; rdsWatcher = (ResourceWatcher) watcher; break; + case "CDS": + cdsWatchers.computeIfAbsent(resourceName, k -> new ArrayList<>()) + .add((ResourceWatcher) watcher); + break; + case "EDS": + edsWatchers.computeIfAbsent(resourceName, k -> new ArrayList<>()) + .add((ResourceWatcher) watcher); + break; default: } } @@ -2090,14 +2123,36 @@ public void cancelXdsResourceWatch(XdsResourceType rdsResource = null; rdsWatcher = null; break; + case "CDS": + assertThat(cdsWatchers).containsKey(resourceName); + assertThat(cdsWatchers.get(resourceName)).contains(watcher); + cdsWatchers.get(resourceName).remove((ResourceWatcher) watcher); + break; + case "EDS": + assertThat(edsWatchers).containsKey(resourceName); + assertThat(edsWatchers.get(resourceName)).contains(watcher); + edsWatchers.get(resourceName).remove((ResourceWatcher) watcher); + break; default: } } + void deliverLdsUpdateOnly(long httpMaxStreamDurationNano, List virtualHosts) { + syncContext.execute(() -> { + ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forVirtualHosts( + httpMaxStreamDurationNano, virtualHosts, null))); + }); + } void deliverLdsUpdate(long httpMaxStreamDurationNano, List virtualHosts) { + List clusterNames = new ArrayList<>(); + for (VirtualHost vh : virtualHosts) { + clusterNames.addAll(getClusterNames(vh.routes())); + } + syncContext.execute(() -> { ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forVirtualHosts( httpMaxStreamDurationNano, virtualHosts, null))); + createAndDeliverClusterUpdates(this, clusterNames.toArray(new String[0])); }); } @@ -2106,9 +2161,14 @@ void deliverLdsUpdate(final List routes) { VirtualHost.create( "virtual-host", Collections.singletonList(expectedLdsResourceName), routes, ImmutableMap.of()); + List clusterNames = getClusterNames(routes); + syncContext.execute(() -> { ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forVirtualHosts( 0L, Collections.singletonList(virtualHost), null))); + if (!clusterNames.isEmpty()) { + createAndDeliverClusterUpdates(this, clusterNames.toArray(new String[0])); + } }); } @@ -2157,6 +2217,7 @@ void deliverLdsUpdateWithFaultInjection( syncContext.execute(() -> { ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forVirtualHosts( 0L, Collections.singletonList(virtualHost), filterChain))); + createAndDeliverClusterUpdates(this, cluster); }); } @@ -2188,6 +2249,29 @@ void deliverLdsResourceNotFound() { }); } + private List getClusterNames(List routes) { + List clusterNames = new ArrayList<>(); + for (Route r : routes) { + if (r.routeAction() == null) { + continue; + } + String cluster = r.routeAction().cluster(); + if (cluster != null) { + clusterNames.add(cluster); + } else { + List weightedClusters = r.routeAction().weightedClusters(); + if (weightedClusters == null) { + continue; + } + for (ClusterWeight wc : weightedClusters) { + clusterNames.add(wc.name()); + } + } + } + + return clusterNames; + } + void deliverRdsUpdateWithFaultInjection( String resourceName, @Nullable FaultConfig virtualHostFaultConfig, @Nullable FaultConfig routFaultConfig, @Nullable FaultConfig weightedClusterFaultConfig) { @@ -2224,6 +2308,7 @@ void deliverRdsUpdateWithFaultInjection( overrideConfig); syncContext.execute(() -> { rdsWatcher.onChanged(new RdsUpdate(Collections.singletonList(virtualHost))); + createAndDeliverClusterUpdates(this, cluster1); }); } @@ -2245,6 +2330,39 @@ void deliverRdsResourceNotFound(String resourceName) { }); } + private void deliverCdsUpdate(String clusterName, CdsUpdate update) { + if (!cdsWatchers.containsKey(clusterName)) { + return; + } + syncContext.execute(() -> { + List> resourceWatchers = + ImmutableList.copyOf(cdsWatchers.get(clusterName)); + resourceWatchers.forEach(w -> w.onChanged(update)); + }); + } + + private void deliverCdsResourceNotExist(String clusterName) { + if (!cdsWatchers.containsKey(clusterName)) { + return; + } + syncContext.execute(() -> { + ImmutableList.copyOf(cdsWatchers.get(clusterName)) + .forEach(w -> w.onResourceDoesNotExist(clusterName)); + }); + } + + private void deliverEdsUpdate(String name, EdsUpdate update) { + syncContext.execute(() -> { + if (!edsWatchers.containsKey(name)) { + return; + } + List> resourceWatchers = + ImmutableList.copyOf(edsWatchers.get(name)); + resourceWatchers.forEach(w -> w.onChanged(update)); + }); + } + + void deliverError(final Status error) { if (ldsWatcher != null) { syncContext.execute(() -> { @@ -2256,6 +2374,11 @@ void deliverError(final Status error) { rdsWatcher.onError(error); }); } + syncContext.execute(() -> { + cdsWatchers.values().stream() + .flatMap(List::stream) + .forEach(w -> w.onError(error)); + }); } } diff --git a/xds/src/test/java/io/grpc/xds/XdsTestUtils.java b/xds/src/test/java/io/grpc/xds/XdsTestUtils.java index 7f5ec0b27c6..67e0acd28a7 100644 --- a/xds/src/test/java/io/grpc/xds/XdsTestUtils.java +++ b/xds/src/test/java/io/grpc/xds/XdsTestUtils.java @@ -255,12 +255,7 @@ static XdsConfig getDefaultXdsConfig(String serverHostName) VirtualHost virtualHost = rdsUpdate.virtualHosts.get(0); // Need to create endpoints to create locality endpoints map to create edsUpdate - Map lbEndpointsMap = new HashMap<>(); - LbEndpoint lbEndpoint = - LbEndpoint.create(serverHostName, ENDPOINT_PORT, 0, true, ENDPOINT_HOSTNAME); - lbEndpointsMap.put( - Locality.create("", "", ""), - LocalityLbEndpoints.create(ImmutableList.of(lbEndpoint), 10, 0)); + Map lbEndpointsMap = createMinimalLbEndpointsMap(serverHostName); // Need to create EdsUpdate to create CdsUpdate to create XdsClusterConfig for builder XdsEndpointResource.EdsUpdate edsUpdate = new XdsEndpointResource.EdsUpdate( @@ -280,6 +275,16 @@ static XdsConfig getDefaultXdsConfig(String serverHostName) return builder.build(); } + static Map createMinimalLbEndpointsMap(String serverHostName) { + Map lbEndpointsMap = new HashMap<>(); + LbEndpoint lbEndpoint = + LbEndpoint.create(serverHostName, ENDPOINT_PORT, 0, true, ENDPOINT_HOSTNAME); + lbEndpointsMap.put( + Locality.create("", "", ""), + LocalityLbEndpoints.create(ImmutableList.of(lbEndpoint), 10, 0)); + return lbEndpointsMap; + } + @SuppressWarnings("unchecked") private static ImmutableMap getWrrLbConfigAsMap() throws IOException { String lbConfigStr = "{\"wrr_locality_experimental\" : "