Skip to content

Commit

Permalink
Review comments #1.
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Nov 22, 2019
1 parent 7c9b3f4 commit b7e6dbf
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 42 deletions.
21 changes: 12 additions & 9 deletions xds/internal/client/cds.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,16 @@ func (v2c *v2Client) handleCDSResponse(resp *xdspb.DiscoveryResponse) error {
if !ok {
return fmt.Errorf("xds: unexpected resource type: %T in CDS response", resource.Message)
}
service, doLRS, err := validateCluster(cluster)
update, err := validateCluster(cluster)
if err != nil {
return err
}

// If the Cluster message in the CDS response did not contain a
// serviceName, we will just use the clusterName for EDS.
if service == "" {
service = cluster.GetName()
if update.serviceName == "" {
update.serviceName = cluster.GetName()
}
update := cdsUpdate{service, doLRS}
localCache[cluster.GetName()] = update
if cluster.GetName() == wi.target[0] {
returnUpdate = update
Expand All @@ -95,15 +94,19 @@ func (v2c *v2Client) handleCDSResponse(resp *xdspb.DiscoveryResponse) error {
return nil
}

func validateCluster(cluster *xdspb.Cluster) (string, bool, error) {
func validateCluster(cluster *xdspb.Cluster) (cdsUpdate, error) {
emptyUpdate := cdsUpdate{serviceName: "", doLRS: false}
switch {
case cluster.GetType() != xdspb.Cluster_EDS:
return "", false, fmt.Errorf("xds: unexpected cluster type %v in response: %+v", cluster.GetType(), cluster)
return emptyUpdate, fmt.Errorf("xds: unexpected cluster type %v in response: %+v", cluster.GetType(), cluster)
case cluster.GetEdsClusterConfig().GetEdsConfig().GetAds() == nil:
return "", false, fmt.Errorf("xds: unexpected edsConfig in response: %+v", cluster)
return emptyUpdate, fmt.Errorf("xds: unexpected edsConfig in response: %+v", cluster)
case cluster.GetLbPolicy() != xdspb.Cluster_ROUND_ROBIN:
return "", false, fmt.Errorf("xds: unexpected lbPolicy %v in response: %+v", cluster.GetLbPolicy(), cluster)
return emptyUpdate, fmt.Errorf("xds: unexpected lbPolicy %v in response: %+v", cluster.GetLbPolicy(), cluster)
}

return cluster.GetEdsClusterConfig().GetServiceName(), cluster.GetLrsServer().GetSelf() != nil, nil
return cdsUpdate{
serviceName: cluster.GetEdsClusterConfig().GetServiceName(),
doLRS: cluster.GetLrsServer().GetSelf() != nil,
}, nil
}
52 changes: 21 additions & 31 deletions xds/internal/client/cds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ func (v2c *v2Client) cloneCDSCacheForTesting() map[string]cdsUpdate {
}

func TestValidateCluster(t *testing.T) {
emptyUpdate := cdsUpdate{serviceName: "", doLRS: false}
tests := []struct {
name string
cluster *xdspb.Cluster
wantService string
wantDoLRS bool
wantErr bool
name string
cluster *xdspb.Cluster
wantUpdate cdsUpdate
wantErr bool
}{
{
name: "non-eds-cluster-type",
Expand All @@ -71,19 +71,17 @@ func TestValidateCluster(t *testing.T) {
},
LbPolicy: xdspb.Cluster_LEAST_REQUEST,
},
wantService: "",
wantDoLRS: false,
wantErr: true,
wantUpdate: emptyUpdate,
wantErr: true,
},
{
name: "no-eds-config",
cluster: &xdspb.Cluster{
ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS},
LbPolicy: xdspb.Cluster_ROUND_ROBIN,
},
wantService: "",
wantDoLRS: false,
wantErr: true,
wantUpdate: emptyUpdate,
wantErr: true,
},
{
name: "no-ads-config-source",
Expand All @@ -92,9 +90,8 @@ func TestValidateCluster(t *testing.T) {
EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{},
LbPolicy: xdspb.Cluster_ROUND_ROBIN,
},
wantService: "",
wantDoLRS: false,
wantErr: true,
wantUpdate: emptyUpdate,
wantErr: true,
},
{
name: "non-round-robin-lb-policy",
Expand All @@ -109,9 +106,8 @@ func TestValidateCluster(t *testing.T) {
},
LbPolicy: xdspb.Cluster_LEAST_REQUEST,
},
wantService: "",
wantDoLRS: false,
wantErr: true,
wantUpdate: emptyUpdate,
wantErr: true,
},
{
name: "happy-case-no-service-name-no-lrs",
Expand All @@ -126,8 +122,7 @@ func TestValidateCluster(t *testing.T) {
},
LbPolicy: xdspb.Cluster_ROUND_ROBIN,
},
wantService: "",
wantDoLRS: false,
wantUpdate: emptyUpdate,
},
{
name: "happy-case-no-lrs",
Expand All @@ -143,28 +138,23 @@ func TestValidateCluster(t *testing.T) {
},
LbPolicy: xdspb.Cluster_ROUND_ROBIN,
},
wantService: serviceName1,
wantDoLRS: false,
wantUpdate: cdsUpdate{serviceName: serviceName1, doLRS: false},
},
{
name: "happiest-case",
cluster: goodCluster1,
wantService: serviceName1,
wantDoLRS: true,
name: "happiest-case",
cluster: goodCluster1,
wantUpdate: cdsUpdate{serviceName: serviceName1, doLRS: true},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
gotService, gotDoLRS, gotErr := validateCluster(test.cluster)
gotUpdate, gotErr := validateCluster(test.cluster)
if (gotErr != nil) != test.wantErr {
t.Errorf("validateCluster(%+v) returned error: %v, wantErr: %v", test.cluster, gotErr, test.wantErr)
}
if gotService != test.wantService {
t.Errorf("validateCluster(%+v) returned service: %v, want: %v", test.cluster, gotService, test.wantService)
}
if gotDoLRS != test.wantDoLRS {
t.Errorf("validateCluster(%+v) returned doLRS: %v, want: %v", test.cluster, gotDoLRS, test.wantDoLRS)
if !reflect.DeepEqual(gotUpdate, test.wantUpdate) {
t.Errorf("validateCluster(%+v) = %v, want: %v", test.cluster, gotUpdate, test.wantUpdate)
}
})
}
Expand Down
16 changes: 14 additions & 2 deletions xds/internal/client/v2client.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,11 @@ type v2Client struct {
// validated cluster configurations received in CDS responses. We cache all
// valid cluster configurations, whether or not we are interested in them
// when we received them (because we could become interested in them in the
// future and the server wont send us those resources again). Protected by
// the above mutex.
// future and the server wont send us those resources again). This is only
// to support legacy management servers that do not honor the
// resource_names field. As per the latest spec, the server should resend
// the response when the request changes, even if it had sent the same
// resource earlier (when not asked for). Protected by the above mutex.
cdsCache map[string]cdsUpdate
}

Expand Down Expand Up @@ -171,6 +174,10 @@ func (v2c *v2Client) sendExisting(stream adsStream) bool {
if !v2c.sendRDS(stream, wi.target) {
return false
}
case cdsResource:
if !v2c.sendCDS(stream, wi.target) {
return false
}
case edsResource:
if !v2c.sendEDS(stream, wi.target) {
return false
Expand Down Expand Up @@ -251,6 +258,11 @@ func (v2c *v2Client) recv(stream adsStream) bool {
grpclog.Warningf("xds: RDS response handler failed: %v", err)
return success
}
case clusterURL:
if err := v2c.handleCDSResponse(resp); err != nil {
grpclog.Warningf("xds: CDS response handler failed: %v", err)
return success
}
case endpointURL:
if err := v2c.handleEDSResponse(resp); err != nil {
grpclog.Warningf("xds: EDS response handler failed: %v", err)
Expand Down

0 comments on commit b7e6dbf

Please sign in to comment.