Skip to content

Commit

Permalink
Checkpoint almost everything works
Browse files Browse the repository at this point in the history
  • Loading branch information
larry-safran committed Jan 31, 2025
1 parent e4e81fb commit 5422bfd
Show file tree
Hide file tree
Showing 11 changed files with 1,283 additions and 1,200 deletions.
1,169 changes: 1,046 additions & 123 deletions xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java

Large diffs are not rendered by default.

780 changes: 2 additions & 778 deletions xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java

Large diffs are not rendered by default.

152 changes: 0 additions & 152 deletions xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,13 @@

package io.grpc.xds;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Struct;
import io.grpc.Internal;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancerProvider;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.Status;
import io.grpc.xds.EnvoyServerProtoData.OutlierDetection;
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
import io.grpc.xds.client.Bootstrapper.ServerInfo;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;

/**
* The provider for the cluster_resolver load balancing policy. This class should not be directly
Expand Down Expand Up @@ -69,145 +58,4 @@ public LoadBalancer newLoadBalancer(Helper helper) {
return new ClusterResolverLoadBalancer(helper);
}

static final class ClusterResolverConfig {
// Ordered list of clusters to be resolved.
final List<DiscoveryMechanism> discoveryMechanisms;
// GracefulSwitch configuration
final Object lbConfig;

ClusterResolverConfig(List<DiscoveryMechanism> discoveryMechanisms, Object lbConfig) {
this.discoveryMechanisms = checkNotNull(discoveryMechanisms, "discoveryMechanisms");
this.lbConfig = checkNotNull(lbConfig, "lbConfig");
}

@Override
public int hashCode() {
return Objects.hash(discoveryMechanisms, lbConfig);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ClusterResolverConfig that = (ClusterResolverConfig) o;
return discoveryMechanisms.equals(that.discoveryMechanisms)
&& lbConfig.equals(that.lbConfig);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("discoveryMechanisms", discoveryMechanisms)
.add("lbConfig", lbConfig)
.toString();
}

// Describes the mechanism for a specific cluster.
static final class DiscoveryMechanism {
// Name of the cluster to resolve.
final String cluster;
// Type of the cluster.
final Type type;
// Load reporting server info. Null if not enabled.
@Nullable
final ServerInfo lrsServerInfo;
// Cluster-level max concurrent request threshold. Null if not specified.
@Nullable
final Long maxConcurrentRequests;
// TLS context for connections to endpoints in the cluster.
@Nullable
final UpstreamTlsContext tlsContext;
// Resource name for resolving endpoints via EDS. Only valid for EDS clusters.
@Nullable
final String edsServiceName;
// Hostname for resolving endpoints via DNS. Only valid for LOGICAL_DNS clusters.
@Nullable
final String dnsHostName;
@Nullable
final OutlierDetection outlierDetection;
final Map<String, Struct> filterMetadata;

enum Type {
EDS,
LOGICAL_DNS,
}

private DiscoveryMechanism(String cluster, Type type, @Nullable String edsServiceName,
@Nullable String dnsHostName, @Nullable ServerInfo lrsServerInfo,
@Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext,
Map<String, Struct> filterMetadata, @Nullable OutlierDetection outlierDetection) {
this.cluster = checkNotNull(cluster, "cluster");
this.type = checkNotNull(type, "type");
this.edsServiceName = edsServiceName;
this.dnsHostName = dnsHostName;
this.lrsServerInfo = lrsServerInfo;
this.maxConcurrentRequests = maxConcurrentRequests;
this.tlsContext = tlsContext;
this.filterMetadata = ImmutableMap.copyOf(checkNotNull(filterMetadata, "filterMetadata"));
this.outlierDetection = outlierDetection;
}

static DiscoveryMechanism forEds(String cluster, @Nullable String edsServiceName,
@Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
@Nullable UpstreamTlsContext tlsContext, Map<String, Struct> filterMetadata,
OutlierDetection outlierDetection) {
return new DiscoveryMechanism(cluster, Type.EDS, edsServiceName, null, lrsServerInfo,
maxConcurrentRequests, tlsContext, filterMetadata, outlierDetection);
}

static DiscoveryMechanism forLogicalDns(String cluster, String dnsHostName,
@Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
@Nullable UpstreamTlsContext tlsContext, Map<String, Struct> filterMetadata) {
return new DiscoveryMechanism(cluster, Type.LOGICAL_DNS, null, dnsHostName,
lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, null);
}

@Override
public int hashCode() {
return Objects.hash(cluster, type, lrsServerInfo, maxConcurrentRequests, tlsContext,
edsServiceName, dnsHostName, filterMetadata, outlierDetection);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DiscoveryMechanism that = (DiscoveryMechanism) o;
return cluster.equals(that.cluster)
&& type == that.type
&& Objects.equals(edsServiceName, that.edsServiceName)
&& Objects.equals(dnsHostName, that.dnsHostName)
&& Objects.equals(lrsServerInfo, that.lrsServerInfo)
&& Objects.equals(maxConcurrentRequests, that.maxConcurrentRequests)
&& Objects.equals(tlsContext, that.tlsContext)
&& Objects.equals(filterMetadata, that.filterMetadata)
&& Objects.equals(outlierDetection, that.outlierDetection);
}

@Override
public String toString() {
MoreObjects.ToStringHelper toStringHelper =
MoreObjects.toStringHelper(this)
.add("cluster", cluster)
.add("type", type)
.add("edsServiceName", edsServiceName)
.add("dnsHostName", dnsHostName)
.add("lrsServerInfo", lrsServerInfo)
// Exclude tlsContext as its string representation is cumbersome.
.add("maxConcurrentRequests", maxConcurrentRequests)
.add("filterMetadata", filterMetadata)
// Exclude outlierDetection as its string representation is long.
;
return toStringHelper.toString();
}
}
}
}
7 changes: 7 additions & 0 deletions xds/src/main/java/io/grpc/xds/XdsAttributes.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ final class XdsAttributes {
static final Attributes.Key<ObjectPool<XdsClient>> XDS_CLIENT_POOL =
Attributes.Key.create("io.grpc.xds.XdsAttributes.xdsClientPool");

/**
* Attribute key for passing around the XdsClient object pool across NameResolver/LoadBalancers.
*/
@NameResolver.ResolutionResultAttr
static final Attributes.Key<XdsConfig> XDS_CONFIG =
Attributes.Key.create("io.grpc.xds.XdsAttributes.xdsConfig");

/**
* Attribute key for obtaining the global provider that provides atomics for aggregating
* outstanding RPCs sent to each cluster.
Expand Down
1 change: 0 additions & 1 deletion xds/src/main/java/io/grpc/xds/XdsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ XdsConfigBuilder setVirtualHost(VirtualHost virtualHost) {

XdsConfig build() {
checkNotNull(listener, "listener");
checkNotNull(route, "route");
return new XdsConfig(listener, route, clusters, virtualHost);
}
}
Expand Down
34 changes: 27 additions & 7 deletions xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,18 @@ private void maybePublishConfig() {
return;
}

// Ensure that LDS is valid. If not, onError or onResourceDoesNotExist should have been called
TypeWatchers<?> ldsWatcher = resourceWatchers.get(XdsListenerResource.getInstance());
TypeWatchers<?> rdsWatcher = resourceWatchers.get(XdsRouteConfigureResource.getInstance());
if (ldsWatcher == null || ldsWatcher.watchers.isEmpty()) {
return;
}
XdsWatcherBase<?> lds = ldsWatcher.watchers.values().iterator().next();
if (lds == null || !lds.hasDataValue()) {
return;
}


lastXdsConfig = buildConfig();
xdsConfigWatcher.onUpdate(lastXdsConfig);
}
Expand All @@ -267,7 +279,7 @@ private XdsConfig buildConfig() {
resourceWatchers.get(XdsListenerResource.getInstance()).watchers.values()) {
XdsListenerResource.LdsUpdate ldsUpdate = ((LdsWatcher) xdsWatcherBase).getData().getValue();
builder.setListener(ldsUpdate);
if (activeVirtualHost == null) {
if (activeVirtualHost == null && ldsUpdate.httpConnectionManager().virtualHosts() != null) {
activeVirtualHost = RoutingUtils.findVirtualHostForHostName(
ldsUpdate.httpConnectionManager().virtualHosts(), dataPlaneAuthority);
}
Expand All @@ -279,16 +291,23 @@ private XdsConfig buildConfig() {
}
}

resourceWatchers.get(XdsRouteConfigureResource.getInstance()).watchers.values().stream()
.map(watcher -> (RdsWatcher) watcher)
.forEach(watcher -> builder.setRoute(watcher.getData().getValue()));
if (resourceWatchers.get(XdsRouteConfigureResource.getInstance()) != null) {
resourceWatchers.get(XdsRouteConfigureResource.getInstance()).watchers.values().stream()
.map(watcher -> (RdsWatcher) watcher)
.forEach(watcher -> builder.setRoute(watcher.getData().getValue()));
}

builder.setVirtualHost(activeVirtualHost);
if (activeVirtualHost != null) {
builder.setVirtualHost(activeVirtualHost);
}

TypeWatchers<?> edsTypeWatchers = resourceWatchers.get(ENDPOINT_RESOURCE);
Map<String, ? extends XdsWatcherBase<?>> edsWatchers =
resourceWatchers.get(ENDPOINT_RESOURCE).watchers;
(edsTypeWatchers != null) ? edsTypeWatchers.watchers : Collections.EMPTY_MAP;

TypeWatchers<?> cdsTypeWatchers = resourceWatchers.get(CLUSTER_RESOURCE);
Map<String, ? extends XdsWatcherBase<?>> cdsWatchers =
resourceWatchers.get(CLUSTER_RESOURCE).watchers;
(cdsTypeWatchers != null) ? cdsTypeWatchers.watchers : Collections.EMPTY_MAP;

// Iterate CDS watchers
for (XdsWatcherBase<?> watcher : cdsWatchers.values()) {
Expand Down Expand Up @@ -462,6 +481,7 @@ public void onError(Status error) {
@Override
public void onResourceDoesNotExist(String resourceName) {
handleDoesNotExist(resourceName);
cleanUpRdsWatcher();
xdsConfigWatcher.onResourceDoesNotExist(toContextString());
}

Expand Down
Loading

0 comments on commit 5422bfd

Please sign in to comment.