Skip to content

Commit

Permalink
xds: CDS implementation in v2Client.
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Nov 21, 2019
1 parent dc49de8 commit 7c9b3f4
Show file tree
Hide file tree
Showing 10 changed files with 767 additions and 48 deletions.
109 changes: 109 additions & 0 deletions xds/internal/client/cds.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package client

import (
"fmt"

xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc/grpclog"
)

// newCDSRequest generates an CDS request proto for the provided clusterName,
// to be sent out on the wire.
func (v2c *v2Client) newCDSRequest(clusterName []string) *xdspb.DiscoveryRequest {
return &xdspb.DiscoveryRequest{
Node: v2c.nodeProto,
TypeUrl: clusterURL,
ResourceNames: clusterName,
}
}

// sendCDS sends an CDS request for provided clusterName on the provided
// stream.
func (v2c *v2Client) sendCDS(stream adsStream, clusterName []string) bool {
if err := stream.Send(v2c.newCDSRequest(clusterName)); err != nil {
grpclog.Warningf("xds: CDS request for resource %v failed: %v", clusterName, err)
return false
}
return true
}

// handleCDSResponse processes an CDS response received from the xDS server. On
// receipt of a good response, it also invokes the registered watcher callback.
func (v2c *v2Client) handleCDSResponse(resp *xdspb.DiscoveryResponse) error {
v2c.mu.Lock()
defer v2c.mu.Unlock()

wi := v2c.watchMap[cdsResource]
if wi == nil {
return fmt.Errorf("xds: no CDS watcher found when handling CDS response: %+v", resp)
}

var returnUpdate cdsUpdate
localCache := make(map[string]cdsUpdate)
for _, r := range resp.GetResources() {
var resource ptypes.DynamicAny
if err := ptypes.UnmarshalAny(r, &resource); err != nil {
return fmt.Errorf("xds: failed to unmarshal resource in CDS response: %v", err)
}
cluster, ok := resource.Message.(*xdspb.Cluster)
if !ok {
return fmt.Errorf("xds: unexpected resource type: %T in CDS response", resource.Message)
}
service, doLRS, err := validateCluster(cluster)
if err != nil {
return err
}

// If the Cluster message in the CDS response did not contain a
// serviceName, we will just use the clusterName for EDS.
if service == "" {
service = cluster.GetName()
}
update := cdsUpdate{service, doLRS}
localCache[cluster.GetName()] = update
if cluster.GetName() == wi.target[0] {
returnUpdate = update
}
}
v2c.cdsCache = localCache

var err error
if returnUpdate.serviceName == "" {
err = fmt.Errorf("xds: CDS target %s not found in received response %+v", wi.target, resp)
}
wi.stopTimer()
wi.callback.(cdsCallback)(returnUpdate, err)
return nil
}

func validateCluster(cluster *xdspb.Cluster) (string, bool, error) {
switch {
case cluster.GetType() != xdspb.Cluster_EDS:
return "", false, fmt.Errorf("xds: unexpected cluster type %v in response: %+v", cluster.GetType(), cluster)
case cluster.GetEdsClusterConfig().GetEdsConfig().GetAds() == nil:
return "", false, fmt.Errorf("xds: unexpected edsConfig in response: %+v", cluster)
case cluster.GetLbPolicy() != xdspb.Cluster_ROUND_ROBIN:
return "", false, fmt.Errorf("xds: unexpected lbPolicy %v in response: %+v", cluster.GetLbPolicy(), cluster)
}

return cluster.GetEdsClusterConfig().GetServiceName(), cluster.GetLrsServer().GetSelf() != nil, nil
}
Loading

0 comments on commit 7c9b3f4

Please sign in to comment.