Skip to content

Commit

Permalink
Much progress in A74 using DependencyManager
Browse files Browse the repository at this point in the history
  • Loading branch information
larry-safran committed Feb 22, 2025
1 parent 9efd2f2 commit e6503ff
Show file tree
Hide file tree
Showing 13 changed files with 1,851 additions and 1,318 deletions.
1,210 changes: 1,066 additions & 144 deletions xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java

Large diffs are not rendered by default.

780 changes: 2 additions & 778 deletions xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java

Large diffs are not rendered by default.

152 changes: 0 additions & 152 deletions xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,13 @@

package io.grpc.xds;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Struct;
import io.grpc.Internal;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancerProvider;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.Status;
import io.grpc.xds.EnvoyServerProtoData.OutlierDetection;
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
import io.grpc.xds.client.Bootstrapper.ServerInfo;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;

/**
* The provider for the cluster_resolver load balancing policy. This class should not be directly
Expand Down Expand Up @@ -69,145 +58,4 @@ public LoadBalancer newLoadBalancer(Helper helper) {
return new ClusterResolverLoadBalancer(helper);
}

static final class ClusterResolverConfig {
// Ordered list of clusters to be resolved.
final List<DiscoveryMechanism> discoveryMechanisms;
// GracefulSwitch configuration
final Object lbConfig;

ClusterResolverConfig(List<DiscoveryMechanism> discoveryMechanisms, Object lbConfig) {
this.discoveryMechanisms = checkNotNull(discoveryMechanisms, "discoveryMechanisms");
this.lbConfig = checkNotNull(lbConfig, "lbConfig");
}

@Override
public int hashCode() {
return Objects.hash(discoveryMechanisms, lbConfig);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ClusterResolverConfig that = (ClusterResolverConfig) o;
return discoveryMechanisms.equals(that.discoveryMechanisms)
&& lbConfig.equals(that.lbConfig);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("discoveryMechanisms", discoveryMechanisms)
.add("lbConfig", lbConfig)
.toString();
}

// Describes the mechanism for a specific cluster.
static final class DiscoveryMechanism {
// Name of the cluster to resolve.
final String cluster;
// Type of the cluster.
final Type type;
// Load reporting server info. Null if not enabled.
@Nullable
final ServerInfo lrsServerInfo;
// Cluster-level max concurrent request threshold. Null if not specified.
@Nullable
final Long maxConcurrentRequests;
// TLS context for connections to endpoints in the cluster.
@Nullable
final UpstreamTlsContext tlsContext;
// Resource name for resolving endpoints via EDS. Only valid for EDS clusters.
@Nullable
final String edsServiceName;
// Hostname for resolving endpoints via DNS. Only valid for LOGICAL_DNS clusters.
@Nullable
final String dnsHostName;
@Nullable
final OutlierDetection outlierDetection;
final Map<String, Struct> filterMetadata;

enum Type {
EDS,
LOGICAL_DNS,
}

private DiscoveryMechanism(String cluster, Type type, @Nullable String edsServiceName,
@Nullable String dnsHostName, @Nullable ServerInfo lrsServerInfo,
@Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext,
Map<String, Struct> filterMetadata, @Nullable OutlierDetection outlierDetection) {
this.cluster = checkNotNull(cluster, "cluster");
this.type = checkNotNull(type, "type");
this.edsServiceName = edsServiceName;
this.dnsHostName = dnsHostName;
this.lrsServerInfo = lrsServerInfo;
this.maxConcurrentRequests = maxConcurrentRequests;
this.tlsContext = tlsContext;
this.filterMetadata = ImmutableMap.copyOf(checkNotNull(filterMetadata, "filterMetadata"));
this.outlierDetection = outlierDetection;
}

static DiscoveryMechanism forEds(String cluster, @Nullable String edsServiceName,
@Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
@Nullable UpstreamTlsContext tlsContext, Map<String, Struct> filterMetadata,
OutlierDetection outlierDetection) {
return new DiscoveryMechanism(cluster, Type.EDS, edsServiceName, null, lrsServerInfo,
maxConcurrentRequests, tlsContext, filterMetadata, outlierDetection);
}

static DiscoveryMechanism forLogicalDns(String cluster, String dnsHostName,
@Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
@Nullable UpstreamTlsContext tlsContext, Map<String, Struct> filterMetadata) {
return new DiscoveryMechanism(cluster, Type.LOGICAL_DNS, null, dnsHostName,
lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, null);
}

@Override
public int hashCode() {
return Objects.hash(cluster, type, lrsServerInfo, maxConcurrentRequests, tlsContext,
edsServiceName, dnsHostName, filterMetadata, outlierDetection);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DiscoveryMechanism that = (DiscoveryMechanism) o;
return cluster.equals(that.cluster)
&& type == that.type
&& Objects.equals(edsServiceName, that.edsServiceName)
&& Objects.equals(dnsHostName, that.dnsHostName)
&& Objects.equals(lrsServerInfo, that.lrsServerInfo)
&& Objects.equals(maxConcurrentRequests, that.maxConcurrentRequests)
&& Objects.equals(tlsContext, that.tlsContext)
&& Objects.equals(filterMetadata, that.filterMetadata)
&& Objects.equals(outlierDetection, that.outlierDetection);
}

@Override
public String toString() {
MoreObjects.ToStringHelper toStringHelper =
MoreObjects.toStringHelper(this)
.add("cluster", cluster)
.add("type", type)
.add("edsServiceName", edsServiceName)
.add("dnsHostName", dnsHostName)
.add("lrsServerInfo", lrsServerInfo)
// Exclude tlsContext as its string representation is cumbersome.
.add("maxConcurrentRequests", maxConcurrentRequests)
.add("filterMetadata", filterMetadata)
// Exclude outlierDetection as its string representation is long.
;
return toStringHelper.toString();
}
}
}
}
4 changes: 4 additions & 0 deletions xds/src/main/java/io/grpc/xds/RoutingUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ private RoutingUtils() {
*/
@Nullable
static VirtualHost findVirtualHostForHostName(List<VirtualHost> virtualHosts, String hostName) {
if (virtualHosts == null || virtualHosts.isEmpty()) {
return null;
}

// Domain search order:
// 1. Exact domain names: ``www.foo.com``.
// 2. Suffix domain wildcards: ``*.foo.com`` or ``*-bar.foo.com``.
Expand Down
7 changes: 7 additions & 0 deletions xds/src/main/java/io/grpc/xds/XdsAttributes.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ final class XdsAttributes {
static final Attributes.Key<ObjectPool<XdsClient>> XDS_CLIENT_POOL =
Attributes.Key.create("io.grpc.xds.XdsAttributes.xdsClientPool");

/**
* Attribute key for passing around the XdsClient object pool across NameResolver/LoadBalancers.
*/
@NameResolver.ResolutionResultAttr
static final Attributes.Key<XdsConfig> XDS_CONFIG =
Attributes.Key.create("io.grpc.xds.XdsAttributes.xdsConfig");

/**
* Attribute key for obtaining the global provider that provides atomics for aggregating
* outstanding RPCs sent to each cluster.
Expand Down
16 changes: 15 additions & 1 deletion xds/src/main/java/io/grpc/xds/XdsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,21 @@ public ImmutableMap<String, StatusOr<XdsClusterConfig>> getClusters() {
return clusters;
}

public XdsConfigBuilder toBuilder() {
XdsConfigBuilder builder = new XdsConfigBuilder()
.setVirtualHost(getVirtualHost())
.setRoute(getRoute())
.setListener(getListener());

if (clusters != null) {
for (Map.Entry<String, StatusOr<XdsClusterConfig>> entry : clusters.entrySet()) {
builder.addCluster(entry.getKey(), entry.getValue());
}
}

return builder;
}

static final class XdsClusterConfig {
private final String clusterName;
private final CdsUpdate clusterResource;
Expand Down Expand Up @@ -181,7 +196,6 @@ XdsConfigBuilder setVirtualHost(VirtualHost virtualHost) {

XdsConfig build() {
checkNotNull(listener, "listener");
checkNotNull(route, "route");
return new XdsConfig(listener, route, clusters, virtualHost);
}
}
Expand Down
69 changes: 52 additions & 17 deletions xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -288,16 +288,24 @@ XdsConfig buildConfig() {
}
}

resourceWatchers.get(XdsRouteConfigureResource.getInstance()).watchers.values().stream()
.map(watcher -> (RdsWatcher) watcher)
.forEach(watcher -> builder.setRoute(watcher.getData().getValue()));
if (resourceWatchers.containsKey(XdsRouteConfigureResource.getInstance())) {
resourceWatchers.get(XdsRouteConfigureResource.getInstance()).watchers.values().stream()
.map(watcher -> (RdsWatcher) watcher)
.forEach(watcher -> builder.setRoute(watcher.getData().getValue()));
}

builder.setVirtualHost(activeVirtualHost);
if (activeVirtualHost != null) {
builder.setVirtualHost(activeVirtualHost);
}

Map<String, ? extends XdsWatcherBase<?>> edsWatchers =
resourceWatchers.get(ENDPOINT_RESOURCE).watchers;
resourceWatchers.containsKey(ENDPOINT_RESOURCE)
? resourceWatchers.get(ENDPOINT_RESOURCE).watchers
: Collections.EMPTY_MAP;
Map<String, ? extends XdsWatcherBase<?>> cdsWatchers =
resourceWatchers.get(CLUSTER_RESOURCE).watchers;
resourceWatchers.containsKey(CLUSTER_RESOURCE)
? resourceWatchers.get(CLUSTER_RESOURCE).watchers
: Collections.EMPTY_MAP;

// Iterate CDS watchers
for (XdsWatcherBase<?> watcher : cdsWatchers.values()) {
Expand Down Expand Up @@ -450,28 +458,39 @@ public void onChanged(XdsListenerResource.LdsUpdate update) {

if (virtualHosts != null) {
// No RDS watcher since we are getting RDS updates via LDS
updateRoutes(virtualHosts, this, activeVirtualHost, this.rdsName == null);
boolean updateSuccessful = updateRoutes(virtualHosts, this, activeVirtualHost, this.rdsName == null);
this.rdsName = null;
if (!updateSuccessful) {
lastXdsConfig = null;
return;
}

} else if (changedRdsName) {
cleanUpRdsWatcher();
lastXdsConfig = null;
this.rdsName = rdsName;
addWatcher(new RdsWatcher(rdsName));
logger.log(XdsLogger.XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName);
}

setData(update);
maybePublishConfig();
if (virtualHosts != null || changedRdsName) {
maybePublishConfig();
}
}

@Override
public void onError(Status error) {
super.onError(checkNotNull(error, "error"));
lastXdsConfig = null; //When we get a good update, we will publish it
xdsConfigWatcher.onError(toContextString(), error);
}

@Override
public void onResourceDoesNotExist(String resourceName) {
handleDoesNotExist(resourceName);
cleanUpRdsWatcher();
rdsName = null;
lastXdsConfig = null; // Publishing an empty result
xdsConfigWatcher.onResourceDoesNotExist(toContextString());
}

Expand Down Expand Up @@ -518,20 +537,23 @@ public void onChanged(RdsUpdate update) {
? RoutingUtils.findVirtualHostForHostName(oldData.virtualHosts, dataPlaneAuthority)
: null;
setData(update);
updateRoutes(update.virtualHosts, this, oldVirtualHost, true);
maybePublishConfig();
if (updateRoutes(update.virtualHosts, this, oldVirtualHost, true)) {
maybePublishConfig();
}
}

@Override
public void onError(Status error) {
super.onError(checkNotNull(error, "error"));
xdsConfigWatcher.onError(toContextString(), error);
lastXdsConfig = null; // will publish when we get a good update
}

@Override
public void onResourceDoesNotExist(String resourceName) {
handleDoesNotExist(checkNotNull(resourceName, "resourceName"));
xdsConfigWatcher.onResourceDoesNotExist(toContextString());
lastXdsConfig = null; // Published an empty result
}

ImmutableList<String> getCdsNames() {
Expand All @@ -557,6 +579,12 @@ public void onChanged(XdsClusterResource.CdsUpdate update) {
switch (update.clusterType()) {
case EDS:
setData(update);
if (update.edsServiceName() == null) {
Status error = Status.UNAVAILABLE.withDescription("EDS cluster missing edsServiceName");
setDataAsStatus(error);
maybePublishConfig();
return;
}
if (!addEdsWatcher(update.edsServiceName(), this)) {
maybePublishConfig();
}
Expand All @@ -577,8 +605,13 @@ public void onChanged(XdsClusterResource.CdsUpdate update) {
setDataAsStatus(error);
}
if (hasDataValue()) {
Set<String> oldNames = new HashSet<>(getData().getValue().prioritizedClusterNames());
Set<String> newNames = new HashSet<>(update.prioritizedClusterNames());
ImmutableList<String> oldChildNames = getData().getValue().prioritizedClusterNames();
Set<String> oldNames = oldChildNames != null
? new HashSet<>(oldChildNames)
: new HashSet<>();
ImmutableList<String> newChildNames = update.prioritizedClusterNames();
Set<String> newNames =
newChildNames != null ? new HashSet<>(newChildNames) : new HashSet<>();


Set<String> deletedClusters = Sets.difference(oldNames, newNames);
Expand Down Expand Up @@ -670,17 +703,17 @@ void addParentContext(CdsWatcher parentContext) {
}
}

private void updateRoutes(List<VirtualHost> virtualHosts, Object newParentContext,
private boolean updateRoutes(List<VirtualHost> virtualHosts, Object newParentContext,
VirtualHost oldVirtualHost, boolean sameParentContext) {
VirtualHost virtualHost =
RoutingUtils.findVirtualHostForHostName(virtualHosts, dataPlaneAuthority);
if (virtualHost == null) {
String error = "Failed to find virtual host matching hostname: " + dataPlaneAuthority;
logger.log(XdsLogger.XdsLogLevel.WARNING, error);
cleanUpRoutes();
xdsConfigWatcher.onError(
"xDS node ID:" + dataPlaneAuthority, Status.UNAVAILABLE.withDescription(error));
return;
Status errorStatus = Status.UNAVAILABLE.withDescription(error);
xdsConfigWatcher.onError("xDS node ID:" + dataPlaneAuthority, errorStatus);
return false;
}

Set<String> newClusters = getClusterNamesFromVirtualHost(virtualHost);
Expand All @@ -697,6 +730,8 @@ private void updateRoutes(List<VirtualHost> virtualHosts, Object newParentContex
} else {
newClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext, 1));
}

return true;
}

private static Set<String> getClusterNamesFromVirtualHost(VirtualHost virtualHost) {
Expand Down
Loading

0 comments on commit e6503ff

Please sign in to comment.