diff --git a/test/xds/xds_client_federation_test.go b/test/xds/xds_client_federation_test.go index 80dc7de8e2f8..75dd77ba195d 100644 --- a/test/xds/xds_client_federation_test.go +++ b/test/xds/xds_client_federation_test.go @@ -188,7 +188,7 @@ func (s) TestFederation_UnknownAuthorityInDialTarget(t *testing.T) { target = fmt.Sprintf("xds://unknown-authority/%s", serviceName) t.Logf("Dialing target %q with unknown authority which is expected to fail", target) - const wantErr = `authority "unknown-authority" is not found in the bootstrap file` + wantErr := fmt.Sprintf("authority \"unknown-authority\" specified in dial target %q is not found in the bootstrap file", target) _, err = grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver)) if err == nil || !strings.Contains(err.Error(), wantErr) { t.Fatalf("grpc.Dial(%q) returned %v, want: %s", target, err, wantErr) diff --git a/xds/internal/resolver/serviceconfig.go b/xds/internal/resolver/serviceconfig.go index e742e2a0f085..88cb1d2a1fdd 100644 --- a/xds/internal/resolver/serviceconfig.go +++ b/xds/internal/resolver/serviceconfig.go @@ -39,7 +39,6 @@ import ( "google.golang.org/grpc/xds/internal/balancer/clustermanager" "google.golang.org/grpc/xds/internal/balancer/ringhash" "google.golang.org/grpc/xds/internal/httpfilter" - rinternal "google.golang.org/grpc/xds/internal/resolver/internal" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) @@ -72,16 +71,6 @@ type xdsClusterManagerConfig struct { Children map[string]xdsChildConfig `json:"children"` } -// pruneActiveClusters deletes entries in r.activeClusters with zero -// references. -func (r *xdsResolver) pruneActiveClusters() { - for cluster, ci := range r.activeClusters { - if atomic.LoadInt32(&ci.refCount) == 0 { - delete(r.activeClusters, cluster) - } - } -} - // serviceConfigJSON produces a service config in JSON format representing all // the clusters referenced in activeClusters. This includes clusters with zero // references, so they must be pruned first. @@ -193,10 +182,9 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP if v := atomic.AddInt32(ref, -1); v == 0 { // This entry will be removed from activeClusters when // producing the service config for the empty update. - select { - case cs.r.updateCh <- suWithError{emptyUpdate: true}: - default: - } + cs.r.serializer.Schedule(func(context.Context) { + cs.r.onClusterRefDownToZero() + }) } }, Interceptor: interceptor, @@ -338,97 +326,10 @@ func (cs *configSelector) stop() { // selector; we need another update to delete clusters from the config (if // we don't have another update pending already). if needUpdate { - select { - case cs.r.updateCh <- suWithError{emptyUpdate: true}: - default: - } - } -} - -// newConfigSelector creates the config selector for su; may add entries to -// r.activeClusters for previously-unseen clusters. -func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, error) { - cs := &configSelector{ - r: r, - virtualHost: virtualHost{ - httpFilterConfigOverride: su.virtualHost.HTTPFilterConfigOverride, - retryConfig: su.virtualHost.RetryConfig, - }, - routes: make([]route, len(su.virtualHost.Routes)), - clusters: make(map[string]*clusterInfo), - httpFilterConfig: su.ldsConfig.httpFilterConfig, + cs.r.serializer.Schedule(func(context.Context) { + cs.r.onClusterRefDownToZero() + }) } - - for i, rt := range su.virtualHost.Routes { - clusters := rinternal.NewWRR.(func() wrr.WRR)() - if rt.ClusterSpecifierPlugin != "" { - clusterName := clusterSpecifierPluginPrefix + rt.ClusterSpecifierPlugin - clusters.Add(&routeCluster{ - name: clusterName, - }, 1) - cs.initializeCluster(clusterName, xdsChildConfig{ - ChildPolicy: balancerConfig(su.clusterSpecifierPlugins[rt.ClusterSpecifierPlugin]), - }) - } else { - for cluster, wc := range rt.WeightedClusters { - clusterName := clusterPrefix + cluster - clusters.Add(&routeCluster{ - name: clusterName, - httpFilterConfigOverride: wc.HTTPFilterConfigOverride, - }, int64(wc.Weight)) - cs.initializeCluster(clusterName, xdsChildConfig{ - ChildPolicy: newBalancerConfig(cdsName, cdsBalancerConfig{Cluster: cluster}), - }) - } - } - cs.routes[i].clusters = clusters - - var err error - cs.routes[i].m, err = xdsresource.RouteToMatcher(rt) - if err != nil { - return nil, err - } - cs.routes[i].actionType = rt.ActionType - if rt.MaxStreamDuration == nil { - cs.routes[i].maxStreamDuration = su.ldsConfig.maxStreamDuration - } else { - cs.routes[i].maxStreamDuration = *rt.MaxStreamDuration - } - - cs.routes[i].httpFilterConfigOverride = rt.HTTPFilterConfigOverride - cs.routes[i].retryConfig = rt.RetryConfig - cs.routes[i].hashPolicies = rt.HashPolicies - } - - // Account for this config selector's clusters. Do this after no further - // errors may occur. Note: cs.clusters are pointers to entries in - // activeClusters. - for _, ci := range cs.clusters { - atomic.AddInt32(&ci.refCount, 1) - } - - return cs, nil -} - -// initializeCluster initializes entries in cs.clusters map, creating entries in -// r.activeClusters as necessary. Any created entries will have a ref count set -// to zero as their ref count will be incremented by incRefs. -func (cs *configSelector) initializeCluster(clusterName string, cfg xdsChildConfig) { - ci := cs.r.activeClusters[clusterName] - if ci == nil { - ci = &clusterInfo{refCount: 0} - cs.r.activeClusters[clusterName] = ci - } - cs.clusters[clusterName] = ci - cs.clusters[clusterName].cfg = cfg -} - -type clusterInfo struct { - // number of references to this cluster; accessed atomically - refCount int32 - // cfg is the child configuration for this cluster, containing either the - // csp config or the cds cluster config. - cfg xdsChildConfig } type interceptorList struct { diff --git a/xds/internal/resolver/watch_service.go b/xds/internal/resolver/watch_service.go index 4f8609ce9df5..abb3c2c5acf1 100644 --- a/xds/internal/resolver/watch_service.go +++ b/xds/internal/resolver/watch_service.go @@ -19,185 +19,77 @@ package resolver import ( - "fmt" - "sync" - "time" + "context" - "google.golang.org/grpc/internal/grpclog" - "google.golang.org/grpc/internal/pretty" - "google.golang.org/grpc/xds/internal/clusterspecifier" - "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) -// serviceUpdate contains information received from the LDS/RDS responses which -// are of interest to the xds resolver. The RDS request is built by first -// making a LDS to get the RouteConfig name. -type serviceUpdate struct { - // virtualHost contains routes and other configuration to route RPCs. - virtualHost *xdsresource.VirtualHost - // clusterSpecifierPlugins contains the configurations for any cluster - // specifier plugins emitted by the xdsclient. - clusterSpecifierPlugins map[string]clusterspecifier.BalancerConfig - // ldsConfig contains configuration that applies to all routes. - ldsConfig ldsConfig +type listenerWatcher struct { + resourceName string + cancel func() + parent *xdsResolver } -// ldsConfig contains information received from the LDS responses which are of -// interest to the xds resolver. -type ldsConfig struct { - // maxStreamDuration is from the HTTP connection manager's - // common_http_protocol_options field. - maxStreamDuration time.Duration - httpFilterConfig []xdsresource.HTTPFilter +func newListenerWatcher(resourceName string, parent *xdsResolver) *listenerWatcher { + lw := &listenerWatcher{resourceName: resourceName, parent: parent} + lw.cancel = xdsresource.WatchListener(parent.xdsClient, resourceName, lw) + return lw } -// watchService uses LDS and RDS to discover information about the provided -// serviceName. -// -// Note that during race (e.g. an xDS response is received while the user is -// calling cancel()), there's a small window where the callback can be called -// after the watcher is canceled. The caller needs to handle this case. -// -// TODO(easwars): Make this function a method on the xdsResolver type. -// Currently, there is a single call site for this function, and all arguments -// passed to it are fields of the xdsResolver type. -func watchService(c xdsclient.XDSClient, serviceName string, cb func(serviceUpdate, error), logger *grpclog.PrefixLogger) (cancel func()) { - w := &serviceUpdateWatcher{ - logger: logger, - c: c, - serviceName: serviceName, - serviceCb: cb, - } - w.ldsCancel = c.WatchListener(serviceName, w.handleLDSResp) - - return w.close +func (l *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData) { + l.parent.serializer.Schedule(func(context.Context) { + l.parent.onListenerResourceUpdate(update.Resource) + }) } -// serviceUpdateWatcher handles LDS and RDS response, and calls the service -// callback at the right time. -type serviceUpdateWatcher struct { - logger *grpclog.PrefixLogger - c xdsclient.XDSClient - serviceName string - ldsCancel func() - serviceCb func(serviceUpdate, error) - lastUpdate serviceUpdate - - mu sync.Mutex - closed bool - rdsName string - rdsCancel func() +func (l *listenerWatcher) OnError(err error) { + l.parent.serializer.Schedule(func(context.Context) { + l.parent.onListenerResourceError(err) + }) } -func (w *serviceUpdateWatcher) handleLDSResp(update xdsresource.ListenerUpdate, err error) { - w.logger.Infof("received LDS update: %+v, err: %v", pretty.ToJSON(update), err) - w.mu.Lock() - defer w.mu.Unlock() - if w.closed { - return - } - if err != nil { - // We check the error type and do different things. For now, the only - // type we check is ResourceNotFound, which indicates the LDS resource - // was removed, and besides sending the error to callback, we also - // cancel the RDS watch. - if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound && w.rdsCancel != nil { - w.rdsCancel() - w.rdsName = "" - w.rdsCancel = nil - w.lastUpdate = serviceUpdate{} - } - // The other error cases still return early without canceling the - // existing RDS watch. - w.serviceCb(serviceUpdate{}, err) - return - } - - w.lastUpdate.ldsConfig = ldsConfig{ - maxStreamDuration: update.MaxStreamDuration, - httpFilterConfig: update.HTTPFilters, - } - - if update.InlineRouteConfig != nil { - // If there was an RDS watch, cancel it. - w.rdsName = "" - if w.rdsCancel != nil { - w.rdsCancel() - w.rdsCancel = nil - } +func (l *listenerWatcher) OnResourceDoesNotExist() { + l.parent.serializer.Schedule(func(context.Context) { + l.parent.onListenerResourceNotFound() + }) +} - // Handle the inline RDS update as if it's from an RDS watch. - w.applyRouteConfigUpdate(*update.InlineRouteConfig) - return - } +func (l *listenerWatcher) stop() { + l.cancel() + l.parent.logger.Infof("Canceling watch on Listener resource %q", l.resourceName) +} - // RDS name from update is not an empty string, need RDS to fetch the - // routes. +type routeConfigWatcher struct { + resourceName string + cancel func() + parent *xdsResolver +} - if w.rdsName == update.RouteConfigName { - // If the new RouteConfigName is same as the previous, don't cancel and - // restart the RDS watch. - // - // If the route name did change, then we must wait until the first RDS - // update before reporting this LDS config. - if w.lastUpdate.virtualHost != nil { - // We want to send an update with the new fields from the new LDS - // (e.g. max stream duration), and old fields from the previous - // RDS. - // - // But note that this should only happen when virtual host is set, - // which means an RDS was received. - w.serviceCb(w.lastUpdate, nil) - } - return - } - w.rdsName = update.RouteConfigName - if w.rdsCancel != nil { - w.rdsCancel() - } - w.rdsCancel = w.c.WatchRouteConfig(update.RouteConfigName, w.handleRDSResp) +func newRouteConfigWatcher(resourceName string, parent *xdsResolver) *routeConfigWatcher { + rw := &routeConfigWatcher{resourceName: resourceName, parent: parent} + rw.cancel = xdsresource.WatchRouteConfig(parent.xdsClient, resourceName, rw) + return rw } -func (w *serviceUpdateWatcher) applyRouteConfigUpdate(update xdsresource.RouteConfigUpdate) { - matchVh := xdsresource.FindBestMatchingVirtualHost(w.serviceName, update.VirtualHosts) - if matchVh == nil { - // No matching virtual host found. - w.serviceCb(serviceUpdate{}, fmt.Errorf("no matching virtual host found for %q", w.serviceName)) - return - } +func (r *routeConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) { + r.parent.serializer.Schedule(func(context.Context) { + r.parent.onRouteConfigResourceUpdate(r.resourceName, update.Resource) + }) +} - w.lastUpdate.virtualHost = matchVh - w.lastUpdate.clusterSpecifierPlugins = update.ClusterSpecifierPlugins - w.serviceCb(w.lastUpdate, nil) +func (r *routeConfigWatcher) OnError(err error) { + r.parent.serializer.Schedule(func(context.Context) { + r.parent.onRouteConfigResourceError(r.resourceName, err) + }) } -func (w *serviceUpdateWatcher) handleRDSResp(update xdsresource.RouteConfigUpdate, err error) { - w.logger.Infof("received RDS update: %+v, err: %v", pretty.ToJSON(update), err) - w.mu.Lock() - defer w.mu.Unlock() - if w.closed { - return - } - if w.rdsCancel == nil { - // This mean only the RDS watch is canceled, can happen if the LDS - // resource is removed. - return - } - if err != nil { - w.serviceCb(serviceUpdate{}, err) - return - } - w.applyRouteConfigUpdate(update) +func (r *routeConfigWatcher) OnResourceDoesNotExist() { + r.parent.serializer.Schedule(func(context.Context) { + r.parent.onRouteConfigResourceNotFound(r.resourceName) + }) } -func (w *serviceUpdateWatcher) close() { - w.mu.Lock() - defer w.mu.Unlock() - w.closed = true - w.ldsCancel() - if w.rdsCancel != nil { - w.rdsCancel() - w.rdsCancel = nil - } +func (r *routeConfigWatcher) stop() { + r.cancel() + r.parent.logger.Infof("Canceling watch on RouteConfiguration resource %q", r.resourceName) } diff --git a/xds/internal/resolver/xds_resolver.go b/xds/internal/resolver/xds_resolver.go index 2a9b6332f87a..a517aa2e4e55 100644 --- a/xds/internal/resolver/xds_resolver.go +++ b/xds/internal/resolver/xds_resolver.go @@ -20,9 +20,10 @@ package resolver import ( - "errors" + "context" "fmt" "strings" + "sync/atomic" "google.golang.org/grpc/credentials" "google.golang.org/grpc/internal" @@ -75,8 +76,6 @@ type xdsResolverBuilder struct { func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (_ resolver.Resolver, retErr error) { r := &xdsResolver{ cc: cc, - closed: grpcsync.NewEvent(), - updateCh: make(chan suWithError, 1), activeClusters: make(map[string]*clusterInfo), channelID: grpcrand.Uint64(), } @@ -88,26 +87,65 @@ func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientCon r.logger = prefixLogger(r) r.logger.Infof("Creating resolver for target: %+v", target) + // Initialize the serializer used to synchronize the following: + // - updates from the xDS client. This could lead to generation of new + // service config if resolution is complete. + // - completion of an RPC to a removed cluster causing the associated ref + // count to become zero, resulting in generation of new service config. + // - stopping of a config selector that results in generation of new service + // config. + ctx, cancel := context.WithCancel(context.Background()) + r.serializer = grpcsync.NewCallbackSerializer(ctx) + r.serializerCancel = cancel + + // Initialize the xDS client. newXDSClient := rinternal.NewXDSClient.(func() (xdsclient.XDSClient, func(), error)) if b.newXDSClient != nil { newXDSClient = b.newXDSClient } - client, close, err := newXDSClient() if err != nil { return nil, fmt.Errorf("xds: failed to create xds-client: %v", err) } r.xdsClient = client r.xdsClientClose = close + + // Determine the listener resource name and start a watcher for it. + template, err := r.sanityChecksOnBootstrapConfig(target, opts, r.xdsClient) + if err != nil { + return nil, err + } + endpoint := target.URL.Path + if endpoint == "" { + endpoint = target.URL.Opaque + } + endpoint = strings.TrimPrefix(endpoint, "/") + r.ldsResourceName = bootstrap.PopulateResourceTemplate(template, endpoint) + r.listenerWatcher = newListenerWatcher(r.ldsResourceName, r) + return r, nil +} + +// Performs the following sanity checks: +// - Verifies that the bootstrap configuration is not empty. +// - Verifies that if xDS credentials are specified by the user, the +// bootstrap configuration contains certificate providers. +// - Verifies that if the provided dial target contains an authority, the +// bootstrap configuration contains server config for that authority. +// +// Returns the listener resource name template to use. If any of the above +// validations fail, a non-nil error is returned. +func (r *xdsResolver) sanityChecksOnBootstrapConfig(target resolver.Target, opts resolver.BuildOptions, client xdsclient.XDSClient) (string, error) { bootstrapConfig := client.BootstrapConfig() if bootstrapConfig == nil { - return nil, errors.New("bootstrap configuration is empty") + // This is never expected to happen after a successful xDS client + // creation. Defensive programming. + return "", fmt.Errorf("xds: bootstrap configuration is empty") } - // If xds credentials were specified by the user, but bootstrap configs do - // not contain any certificate provider configuration, it is better to fail - // right now rather than failing when attempting to create certificate - // providers after receiving an CDS response with security configuration. + // If xDS credentials were specified by the user, but the bootstrap config + // does not contain any certificate providers, it is better to fail right + // now rather than failing when attempting to create certificate providers + // after receiving an CDS response with security configuration. var creds credentials.TransportCredentials switch { case opts.DialCreds != nil: @@ -117,7 +155,7 @@ func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientCon } if xc, ok := creds.(interface{ UsesXDS() bool }); ok && xc.UsesXDS() { if len(bootstrapConfig.CertProviderConfigs) == 0 { - return nil, fmt.Errorf("xds: use of xDS credentials is specified, but certificate_providers config missing in bootstrap file") + return "", fmt.Errorf("xds: use of xDS credentials is specified, but certificate_providers config missing in bootstrap file") } } @@ -128,7 +166,7 @@ func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientCon if authority := target.URL.Host; authority != "" { a := bootstrapConfig.Authorities[authority] if a == nil { - return nil, fmt.Errorf("xds: authority %q is not found in the bootstrap file", authority) + return "", fmt.Errorf("xds: authority %q specified in dial target %q is not found in the bootstrap file", authority, target) } if a.ClientListenerResourceNameTemplate != "" { // This check will never be false, because @@ -137,23 +175,7 @@ func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientCon template = a.ClientListenerResourceNameTemplate } } - endpoint := target.URL.Path - if endpoint == "" { - endpoint = target.URL.Opaque - } - endpoint = strings.TrimPrefix(endpoint, "/") - r.ldsResourceName = bootstrap.PopulateResourceTemplate(template, endpoint) - - // Register a watch on the xdsClient for the resource name determined above. - cancelWatch := watchService(r.xdsClient, r.ldsResourceName, r.handleServiceUpdate, r.logger) - r.logger.Infof("Watch started on resource name %v with xds-client %p", r.ldsResourceName, r.xdsClient) - r.cancelWatch = func() { - cancelWatch() - r.logger.Infof("Watch cancel on resource name %v with xds-client %p", r.ldsResourceName, r.xdsClient) - } - - go r.run() - return r, nil + return template, nil } // Name helps implement the resolver.Builder interface. @@ -161,43 +183,73 @@ func (*xdsResolverBuilder) Scheme() string { return Scheme } -// suWithError wraps the ServiceUpdate and error received through a watch API -// callback, so that it can pushed onto the update channel as a single entity. -type suWithError struct { - su serviceUpdate - emptyUpdate bool - err error -} - // xdsResolver implements the resolver.Resolver interface. // // It registers a watcher for ServiceConfig updates with the xdsClient object // (which performs LDS/RDS queries for the same), and passes the received // updates to the ClientConn. type xdsResolver struct { - cc resolver.ClientConn - closed *grpcsync.Event - logger *grpclog.PrefixLogger - ldsResourceName string - + cc resolver.ClientConn + logger *grpclog.PrefixLogger // The underlying xdsClient which performs all xDS requests and responses. xdsClient xdsclient.XDSClient xdsClientClose func() - // A channel for the watch API callback to write service updates on to. The - // updates are read by the run goroutine and passed on to the ClientConn. - updateCh chan suWithError - // cancelWatch is the function to cancel the watcher. - cancelWatch func() - - // activeClusters is a map from cluster name to a ref count. Only read or - // written during a service update (synchronous). + // A random number which uniquely identifies the channel which owns this + // resolver. + channelID uint64 + + // All methods on the xdsResolver type except for the ones invoked by gRPC, + // i.e ResolveNow() and Close(), are guaranteed to execute in the context of + // this serializer's callback. And since the serializer guarantees mutual + // exclusion among these callbacks, we can get by without any mutexes to + // access all of the below defined state. The only exception is Close(), + // which does access some of this shared state, but it does so after + // cancelling the context passed to the serializer. + serializer *grpcsync.CallbackSerializer + serializerCancel context.CancelFunc + + ldsResourceName string + listenerWatcher *listenerWatcher + listenerUpdateRecvd bool + currentListener xdsresource.ListenerUpdate + + rdsResourceName string + routeConfigWatcher *routeConfigWatcher + routeConfigUpdateRecvd bool + currentRouteConfig xdsresource.RouteConfigUpdate + currentVirtualHost *xdsresource.VirtualHost // Matched virtual host for quick access. + + // activeClusters is a map from cluster name to information about the + // cluster that includes a ref count and load balancing configuration. activeClusters map[string]*clusterInfo curConfigSelector *configSelector +} - // A random number which uniquely identifies the channel which owns this - // resolver. - channelID uint64 +// ResolveNow is a no-op at this point. +func (*xdsResolver) ResolveNow(o resolver.ResolveNowOptions) {} + +func (r *xdsResolver) Close() { + // Cancel the context passed to the serializer and wait for any scheduled + // callbacks to complete. Canceling the context ensures that no new + // callbacks will be scheduled. + r.serializerCancel() + <-r.serializer.Done() + + // Note that Close needs to check for nils even if some of them are always + // set in the constructor. This is because the constructor defers Close() in + // error cases, and the fields might not be set when the error happens. + + if r.listenerWatcher != nil { + r.listenerWatcher.stop() + } + if r.routeConfigWatcher != nil { + r.routeConfigWatcher.stop() + } + if r.xdsClientClose != nil { + r.xdsClientClose() + } + r.logger.Infof("Shutdown") } // sendNewServiceConfig prunes active clusters, generates a new service config @@ -205,6 +257,8 @@ type xdsResolver struct { // channel with that service config and the provided config selector. Returns // false if an error occurs while generating the service config and the update // cannot be sent. +// +// Only executed in the context of a serializer callback. func (r *xdsResolver) sendNewServiceConfig(cs *configSelector) bool { // Delete entries from r.activeClusters with zero references; // otherwise serviceConfigJSON will generate a config including @@ -222,11 +276,11 @@ func (r *xdsResolver) sendNewServiceConfig(cs *configSelector) bool { sc, err := serviceConfigJSON(r.activeClusters) if err != nil { // JSON marshal error; should never happen. - r.logger.Errorf("%v", err) + r.logger.Errorf("For Listener resource %q and RouteConfiguration resource %q, failed to marshal newly built service config: %v", r.ldsResourceName, r.rdsResourceName, err) r.cc.ReportError(err) return false } - r.logger.Infof("Received update on resource %v from xds-client %p, generated service config: %v", r.ldsResourceName, r.xdsClient, pretty.FormatJSON(sc)) + r.logger.Infof("For Listener resource %q and RouteConfiguration resource %q, generated service config: %v", r.ldsResourceName, r.rdsResourceName, pretty.FormatJSON(sc)) // Send the update to the ClientConn. state := iresolver.SetConfigSelector(resolver.State{ @@ -236,94 +290,294 @@ func (r *xdsResolver) sendNewServiceConfig(cs *configSelector) bool { return true } -// run is a long running goroutine which blocks on receiving service updates -// and passes it on the ClientConn. -func (r *xdsResolver) run() { - for { - select { - case <-r.closed.Done(): - return - case update := <-r.updateCh: - if update.err != nil { - r.logger.Warningf("Watch error on resource %v from xds-client %p, %v", r.ldsResourceName, r.xdsClient, update.err) - if xdsresource.ErrType(update.err) == xdsresource.ErrorTypeResourceNotFound { - // If error is resource-not-found, it means the LDS - // resource was removed. Ultimately send an empty service - // config, which picks pick-first, with no address, and - // puts the ClientConn into transient failure. Before we - // can do that, we may need to send a normal service config - // along with an erroring (nil) config selector. - r.sendNewServiceConfig(nil) - // Stop and dereference the active config selector, if one exists. - r.curConfigSelector.stop() - r.curConfigSelector = nil - continue - } - // Send error to ClientConn, and balancers, if error is not - // resource not found. No need to update resolver state if we - // can keep using the old config. - r.cc.ReportError(update.err) - continue - } - if update.emptyUpdate { - r.sendNewServiceConfig(r.curConfigSelector) - continue - } +// newConfigSelector creates a new config selector using the most recently +// received listener and route config updates. May add entries to +// r.activeClusters for previously-unseen clusters. +// +// Only executed in the context of a serializer callback. +func (r *xdsResolver) newConfigSelector() (*configSelector, error) { + cs := &configSelector{ + r: r, + virtualHost: virtualHost{ + httpFilterConfigOverride: r.currentVirtualHost.HTTPFilterConfigOverride, + retryConfig: r.currentVirtualHost.RetryConfig, + }, + routes: make([]route, len(r.currentVirtualHost.Routes)), + clusters: make(map[string]*clusterInfo), + httpFilterConfig: r.currentListener.HTTPFilters, + } - // Create the config selector for this update. - cs, err := r.newConfigSelector(update.su) - if err != nil { - r.logger.Warningf("Error parsing update on resource %v from xds-client %p: %v", r.ldsResourceName, r.xdsClient, err) - r.cc.ReportError(err) - continue + for i, rt := range r.currentVirtualHost.Routes { + clusters := rinternal.NewWRR.(func() wrr.WRR)() + if rt.ClusterSpecifierPlugin != "" { + clusterName := clusterSpecifierPluginPrefix + rt.ClusterSpecifierPlugin + clusters.Add(&routeCluster{ + name: clusterName, + }, 1) + ci := r.addOrGetActiveClusterInfo(clusterName) + ci.cfg = xdsChildConfig{ChildPolicy: balancerConfig(r.currentRouteConfig.ClusterSpecifierPlugins[rt.ClusterSpecifierPlugin])} + cs.clusters[clusterName] = ci + } else { + for cluster, wc := range rt.WeightedClusters { + clusterName := clusterPrefix + cluster + clusters.Add(&routeCluster{ + name: clusterName, + httpFilterConfigOverride: wc.HTTPFilterConfigOverride, + }, int64(wc.Weight)) + ci := r.addOrGetActiveClusterInfo(clusterName) + ci.cfg = xdsChildConfig{ChildPolicy: newBalancerConfig(cdsName, cdsBalancerConfig{Cluster: cluster})} + cs.clusters[clusterName] = ci } + } + cs.routes[i].clusters = clusters - if !r.sendNewServiceConfig(cs) { - // JSON error creating the service config (unexpected); erase - // this config selector and ignore this update, continuing with - // the previous config selector. - cs.stop() - continue - } + var err error + cs.routes[i].m, err = xdsresource.RouteToMatcher(rt) + if err != nil { + return nil, err + } + cs.routes[i].actionType = rt.ActionType + if rt.MaxStreamDuration == nil { + cs.routes[i].maxStreamDuration = r.currentListener.MaxStreamDuration + } else { + cs.routes[i].maxStreamDuration = *rt.MaxStreamDuration + } + + cs.routes[i].httpFilterConfigOverride = rt.HTTPFilterConfigOverride + cs.routes[i].retryConfig = rt.RetryConfig + cs.routes[i].hashPolicies = rt.HashPolicies + } - // Decrement references to the old config selector and assign the - // new one as the current one. - r.curConfigSelector.stop() - r.curConfigSelector = cs + // Account for this config selector's clusters. Do this after no further + // errors may occur. Note: cs.clusters are pointers to entries in + // activeClusters. + for _, ci := range cs.clusters { + atomic.AddInt32(&ci.refCount, 1) + } + + return cs, nil +} + +// pruneActiveClusters deletes entries in r.activeClusters with zero +// references. +func (r *xdsResolver) pruneActiveClusters() { + for cluster, ci := range r.activeClusters { + if atomic.LoadInt32(&ci.refCount) == 0 { + delete(r.activeClusters, cluster) } } } -// handleServiceUpdate is the callback which handles service updates. It writes -// the received update to the update channel, which is picked by the run -// goroutine. -func (r *xdsResolver) handleServiceUpdate(su serviceUpdate, err error) { - if r.closed.HasFired() { - // Do not pass updates to the ClientConn once the resolver is closed. +func (r *xdsResolver) addOrGetActiveClusterInfo(name string) *clusterInfo { + ci := r.activeClusters[name] + if ci != nil { + return ci + } + + ci = &clusterInfo{refCount: 0} + r.activeClusters[name] = ci + return ci +} + +type clusterInfo struct { + // number of references to this cluster; accessed atomically + refCount int32 + // cfg is the child configuration for this cluster, containing either the + // csp config or the cds cluster config. + cfg xdsChildConfig +} + +// Determines if the xdsResolver has received all required configuration, i.e +// Listener and RouteConfiguration resources, from the management server, and +// whether a matching virtual host was found in the RouteConfiguration resource. +func (r *xdsResolver) resolutionComplete() bool { + return r.listenerUpdateRecvd && r.routeConfigUpdateRecvd && r.currentVirtualHost != nil +} + +// onResolutionComplete performs the following actions when resolution is +// complete, i.e Listener and RouteConfiguration resources have been received +// from the management server and a matching virtual host is found in the +// latter. +// - creates a new config selector (this involves incrementing references to +// clusters owned by this config selector). +// - stops the old config selector (this involves decrementing references to +// clusters owned by this config selector). +// - prunes active clusters and pushes a new service config to the channel. +// - updates the current config selector used by the resolver. +// +// Only executed in the context of a serializer callback. +func (r *xdsResolver) onResolutionComplete() { + if !r.resolutionComplete() { + return + } + + cs, err := r.newConfigSelector() + if err != nil { + r.logger.Warningf("Failed to build a config selector for resource %q: %v", r.ldsResourceName, err) + r.cc.ReportError(err) return } - // Remove any existing entry in updateCh and replace with the new one. - select { - case <-r.updateCh: - default: + + if !r.sendNewServiceConfig(cs) { + // JSON error creating the service config (unexpected); erase + // this config selector and ignore this update, continuing with + // the previous config selector. + cs.stop() + return } - r.updateCh <- suWithError{su: su, err: err} + + r.curConfigSelector.stop() + r.curConfigSelector = cs } -// ResolveNow is a no-op at this point. -func (*xdsResolver) ResolveNow(o resolver.ResolveNowOptions) {} +func (r *xdsResolver) applyRouteConfigUpdate(update xdsresource.RouteConfigUpdate) { + matchVh := xdsresource.FindBestMatchingVirtualHost(r.ldsResourceName, update.VirtualHosts) + if matchVh == nil { + r.onError(fmt.Errorf("no matching virtual host found for %q", r.ldsResourceName)) + return + } + r.currentRouteConfig = update + r.currentVirtualHost = matchVh + r.routeConfigUpdateRecvd = true -// Close closes the resolver, and also closes the underlying xdsClient. -func (r *xdsResolver) Close() { - // Note that Close needs to check for nils even if some of them are always - // set in the constructor. This is because the constructor defers Close() in - // error cases, and the fields might not be set when the error happens. - if r.cancelWatch != nil { - r.cancelWatch() + r.onResolutionComplete() +} + +// onError propagates the error up to the channel. And since this is invoked +// only for non resource-not-found errors, we don't have to update resolver +// state and we can keep using the old config. +// +// Only executed in the context of a serializer callback. +func (r *xdsResolver) onError(err error) { + r.cc.ReportError(err) +} + +// Contains common functionality to be executed when resources of either type +// are removed. +// +// Only executed in the context of a serializer callback. +func (r *xdsResolver) onResourceNotFound() { + // We cannot remove clusters from the service config that have ongoing RPCs. + // Instead, what we can do is to send an erroring (nil) config selector + // along with normal service config. This will ensure that new RPCs will + // fail, and once the active RPCs complete, the reference counts on the + // clusters will come down to zero. At that point, we will send an empty + // service config with no addresses. This results in the pick-first + // LB policy being configured on the channel, and since there are no + // address, pick-first will put the channel in TRANSIENT_FAILURE. + r.sendNewServiceConfig(nil) + + // Stop and dereference the active config selector, if one exists. + r.curConfigSelector.stop() + r.curConfigSelector = nil +} + +// Only executed in the context of a serializer callback. +func (r *xdsResolver) onListenerResourceUpdate(update xdsresource.ListenerUpdate) { + if r.logger.V(2) { + r.logger.Infof("Received update for Listener resource %q: %v", r.ldsResourceName, pretty.ToJSON(update)) } - if r.xdsClientClose != nil { - r.xdsClientClose() + + r.currentListener = update + r.listenerUpdateRecvd = true + + if update.InlineRouteConfig != nil { + // If there was a previous route config watcher because of a non-inline + // route configuration, cancel it. + r.rdsResourceName = "" + if r.routeConfigWatcher != nil { + r.routeConfigWatcher.stop() + r.routeConfigWatcher = nil + } + + r.applyRouteConfigUpdate(*update.InlineRouteConfig) + return } - r.closed.Fire() - r.logger.Infof("Shutdown") + + // We get here only if there was no inline route configuration. + + // If the route config name has not changed, send an update with existing + // route configuration and the newly received listener configuration. + if r.rdsResourceName == update.RouteConfigName { + r.onResolutionComplete() + return + } + + // If the route config name has changed, cancel the old watcher and start a + // new one. At this point, since we have not yet resolved the new route + // config name, we don't send an update to the channel, and therefore + // continue using the old route configuration (if received) until the new + // one is received. + r.rdsResourceName = update.RouteConfigName + if r.routeConfigWatcher != nil { + r.routeConfigWatcher.stop() + r.currentVirtualHost = nil + r.routeConfigUpdateRecvd = false + } + r.routeConfigWatcher = newRouteConfigWatcher(r.rdsResourceName, r) +} + +func (r *xdsResolver) onListenerResourceError(err error) { + if r.logger.V(2) { + r.logger.Infof("Received error for Listener resource %q: %v", r.ldsResourceName, err) + } + r.onError(err) +} + +// Only executed in the context of a serializer callback. +func (r *xdsResolver) onListenerResourceNotFound() { + if r.logger.V(2) { + r.logger.Infof("Received resource-not-found-error for Listener resource %q", r.ldsResourceName) + } + + r.listenerUpdateRecvd = false + + if r.routeConfigWatcher != nil { + r.routeConfigWatcher.stop() + } + r.rdsResourceName = "" + r.currentVirtualHost = nil + r.routeConfigUpdateRecvd = false + r.routeConfigWatcher = nil + + r.onResourceNotFound() +} + +// Only executed in the context of a serializer callback. +func (r *xdsResolver) onRouteConfigResourceUpdate(name string, update xdsresource.RouteConfigUpdate) { + if r.logger.V(2) { + r.logger.Infof("Received update for RouteConfiguration resource %q: %v", name, pretty.ToJSON(update)) + } + + if r.rdsResourceName != name { + // Drop updates from canceled watchers. + return + } + + r.applyRouteConfigUpdate(update) +} + +// Only executed in the context of a serializer callback. +func (r *xdsResolver) onRouteConfigResourceError(name string, err error) { + if r.logger.V(2) { + r.logger.Infof("Received error for RouteConfiguration resource %q: %v", name, err) + } + r.onError(err) +} + +// Only executed in the context of a serializer callback. +func (r *xdsResolver) onRouteConfigResourceNotFound(name string) { + if r.logger.V(2) { + r.logger.Infof("Received resource-not-found-error for RouteConfiguration resource %q", name) + } + + if r.rdsResourceName != name { + return + } + r.onResourceNotFound() +} + +// Only executed in the context of a serializer callback. +func (r *xdsResolver) onClusterRefDownToZero() { + r.sendNewServiceConfig(r.curConfigSelector) } diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index 0c898e45ff9e..4030e4031f09 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -103,7 +103,7 @@ func (s) TestResolverBuilder_DifferentBootstrapConfigs(t *testing.T) { NodeID: "node-id", ServerURI: "dummy-management-server", }, - wantErr: `authority "non-existing-authority" is not found in the bootstrap file`, + wantErr: `authority "non-existing-authority" specified in dial target "xds://non-existing-authority/target" is not found in the bootstrap file`, }, { name: "xDS creds specified without certificate providers in bootstrap", @@ -997,7 +997,7 @@ func (s) TestResolverWRR(t *testing.T) { // Read the update pushed by the resolver to the ClientConn. cs := verifyUpdateFromResolver(ctx, t, stateCh, "") - // Make RPCs are verify WRR behavior in the cluster specifier. + // Make RPCs to verify WRR behavior in the cluster specifier. picks := map[string]int{} for i := 0; i < 100; i++ { res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})