Skip to content

Commit

Permalink
Merge pull request #926 from prameshj/composite-part2
Browse files Browse the repository at this point in the history
Modify NEG libraries to use composite types.
  • Loading branch information
k8s-ci-robot authored Nov 9, 2019
2 parents 2351e2b + 3849ae4 commit 011cb28
Show file tree
Hide file tree
Showing 20 changed files with 305 additions and 236 deletions.
3 changes: 1 addition & 2 deletions pkg/backends/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package backends

import (
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
compute "google.golang.org/api/compute/v1"
api_v1 "k8s.io/api/core/v1"
"k8s.io/ingress-gce/pkg/composite"
"k8s.io/ingress-gce/pkg/utils"
Expand Down Expand Up @@ -71,7 +70,7 @@ type Linker interface {

// NEGGetter is an interface to retrieve NEG object
type NEGGetter interface {
GetNetworkEndpointGroup(name string, zone string) (*compute.NetworkEndpointGroup, error)
GetNetworkEndpointGroup(name string, zone string, version meta.Version) (*composite.NetworkEndpointGroup, error)
}

// ProbeProvider retrieves a probe struct given a nodePort
Expand Down
7 changes: 3 additions & 4 deletions pkg/backends/neg_linker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ limitations under the License.
package backends

import (
"google.golang.org/api/compute/v1"
"k8s.io/apimachinery/pkg/util/sets"
befeatures "k8s.io/ingress-gce/pkg/backends/features"
"k8s.io/ingress-gce/pkg/composite"
Expand Down Expand Up @@ -45,7 +44,7 @@ func NewNEGLinker(

// Link implements Link.
func (l *negLinker) Link(sp utils.ServicePort, groups []GroupKey) error {
var negs []*compute.NetworkEndpointGroup
var negs []*composite.NetworkEndpointGroup
var err error
for _, group := range groups {
// If the group key contains a name, then use that.
Expand All @@ -54,7 +53,7 @@ func (l *negLinker) Link(sp utils.ServicePort, groups []GroupKey) error {
if negName == "" {
negName = sp.BackendName()
}
neg, err := l.negGetter.GetNetworkEndpointGroup(negName, group.Zone)
neg, err := l.negGetter.GetNetworkEndpointGroup(negName, group.Zone, utils.GetAPIVersionFromServicePort(&sp))
if err != nil {
return err
}
Expand Down Expand Up @@ -95,7 +94,7 @@ func (l *negLinker) Link(sp utils.ServicePort, groups []GroupKey) error {
return nil
}

func getBackendsForNEGs(negs []*compute.NetworkEndpointGroup) []*composite.Backend {
func getBackendsForNEGs(negs []*composite.NetworkEndpointGroup) []*composite.Backend {
var backends []*composite.Backend
for _, neg := range negs {
b := &composite.Backend{
Expand Down
8 changes: 5 additions & 3 deletions pkg/backends/neg_linker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ limitations under the License.
package backends

import (
"k8s.io/ingress-gce/pkg/composite"
"strings"
"testing"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/mock"
compute "google.golang.org/api/compute/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/ingress-gce/pkg/annotations"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
Expand Down Expand Up @@ -65,8 +66,9 @@ func TestLinkBackendServiceToNEG(t *testing.T) {
linker.backendPool.Create(svcPort, "fake-healthcheck-link")

for _, key := range zones {
err := fakeNEG.CreateNetworkEndpointGroup(&compute.NetworkEndpointGroup{
Name: defaultNamer.NEG(namespace, name, svcPort.Port),
err := fakeNEG.CreateNetworkEndpointGroup(&composite.NetworkEndpointGroup{
Name: defaultNamer.NEG(namespace, name, svcPort.Port),
Version: meta.VersionGA,
}, key.Zone)
if err != nil {
t.Fatalf("unexpected error: %v", err)
Expand Down
79 changes: 51 additions & 28 deletions pkg/composite/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -3726,23 +3726,23 @@ func ListNetworkEndpoints(gceCloud *gce.Cloud, key *meta.Key, version meta.Versi

switch version {
case meta.VersionAlpha:
alphareq, err := req.ToAlpha()
if err != nil {
return nil, err
alphareq, reqerr := req.ToAlpha()
if reqerr != nil {
return nil, reqerr
}
klog.V(3).Infof("Listing alpha zonal NetworkEndpointGroup %v", key.Name)
gceObjs, err = gceCloud.Compute().AlphaNetworkEndpointGroups().ListNetworkEndpoints(ctx, key, alphareq, filter.None)
case meta.VersionBeta:
betareq, err := req.ToBeta()
if err != nil {
return nil, err
betareq, reqerr := req.ToBeta()
if reqerr != nil {
return nil, reqerr
}
klog.V(3).Infof("Listing beta zonal NetworkEndpointGroup %v", key.Name)
gceObjs, err = gceCloud.Compute().BetaNetworkEndpointGroups().ListNetworkEndpoints(ctx, key, betareq, filter.None)
default:
gareq, err := req.ToGA()
if err != nil {
return nil, err
gareq, reqerr := req.ToGA()
if reqerr != nil {
return nil, reqerr
}
klog.V(3).Infof("Listing ga zonal NetworkEndpointGroup %v", key.Name)
gceObjs, err = gceCloud.Compute().NetworkEndpointGroups().ListNetworkEndpoints(ctx, key, gareq, filter.None)
Expand All @@ -3761,43 +3761,66 @@ func ListNetworkEndpoints(gceCloud *gce.Cloud, key *meta.Key, version meta.Versi
return compositeObjs, nil
}

func AggregatedListNetworkEndpointGroup(gceCloud *gce.Cloud, version meta.Version) (map[string][]*NetworkEndpointGroup, error) {
func AggregatedListNetworkEndpointGroup(gceCloud *gce.Cloud, version meta.Version) (map[*meta.Key]*NetworkEndpointGroup, error) {
ctx, cancel := cloudprovider.ContextWithCallTimeout()
defer cancel()
mc := compositemetrics.NewMetricContext("NetworkEndpointGroup", "aggregateList", "", "", string(version))

compositeMap := make(map[*meta.Key]*NetworkEndpointGroup)
var gceObjs interface{}
compositeMap := make(map[string][]*NetworkEndpointGroup)
var err error

switch version {
case meta.VersionAlpha:
klog.V(3).Infof("Aggregate List of alpha zonal NetworkEndpointGroup")
gceObjs, err = gceCloud.Compute().AlphaNetworkEndpointGroups().AggregatedList(ctx, filter.None)
alphaMap, err := gceCloud.Compute().AlphaNetworkEndpointGroups().AggregatedList(ctx, filter.None)
if err != nil {
return nil, mc.Observe(err)
}
// Convert from map to list
alphaList := []*computealpha.NetworkEndpointGroup{}
for _, val := range alphaMap {
alphaList = append(alphaList, val...)
}
gceObjs = alphaList
case meta.VersionBeta:
klog.V(3).Infof("Aggregate List of beta zonal NetworkEndpointGroup")
gceObjs, err = gceCloud.Compute().BetaNetworkEndpointGroups().AggregatedList(ctx, filter.None)
betaMap, err := gceCloud.Compute().BetaNetworkEndpointGroups().AggregatedList(ctx, filter.None)
if err != nil {
return nil, mc.Observe(err)
}
// Convert from map to list
betaList := []*computebeta.NetworkEndpointGroup{}
for _, val := range betaMap {
betaList = append(betaList, val...)
}
gceObjs = betaList
default:
klog.V(3).Infof("Aggregate List of ga zonal NetworkEndpointGroup")
gceObjs, err = gceCloud.Compute().NetworkEndpointGroups().AggregatedList(ctx, filter.None)
gaMap, err := gceCloud.Compute().NetworkEndpointGroups().AggregatedList(ctx, filter.None)
if err != nil {
return nil, mc.Observe(err)
}
// Convert from map to list
gaList := []*compute.NetworkEndpointGroup{}
for _, val := range gaMap {
gaList = append(gaList, val...)
}
gceObjs = gaList
}
compositeObjs, err := ToNetworkEndpointGroupList(gceObjs)
if err != nil {
return nil, mc.Observe(err)
}
gceMap, ok := gceObjs.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("Failed to convert gceObj to map[string]interface{}")
return nil, err
}
for keyStr, obj := range gceMap {
compositeObjs, err := ToNetworkEndpointGroupList(obj)
if err != nil {
return nil, err
}
for _, o := range compositeObjs {
o.Version = version
for _, obj := range compositeObjs {
obj.Version = version
resourceID, err := cloudprovider.ParseResourceURL(obj.SelfLink)
if err != nil || resourceID == nil || resourceID.Key == nil {
klog.Errorf("Failed to parse SelfLink - %s for obj %v, err %v", obj.SelfLink, obj, err)
continue
}
compositeMap[keyStr] = compositeObjs
compositeMap[resourceID.Key] = obj
}

return compositeMap, nil
}

Expand Down
79 changes: 51 additions & 28 deletions pkg/composite/gen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,23 +681,23 @@ func {{.GetGroupResourceInfo.ListFuncName}}(gceCloud *gce.Cloud, key *meta.Key,
switch version {
case meta.VersionAlpha:
alphareq, err := req.ToAlpha()
if err != nil {
return nil, err
alphareq, reqerr := req.ToAlpha()
if reqerr != nil {
return nil, reqerr
}
klog.V(3).Infof("Listing alpha zonal {{.Name}} %v", key.Name)
gceObjs, err = gceCloud.Compute().Alpha{{.GetCloudProviderName}}().{{.GetGroupResourceInfo.ListFuncName}}(ctx, key, alphareq, filter.None)
case meta.VersionBeta:
betareq, err := req.ToBeta()
if err != nil {
return nil, err
betareq, reqerr := req.ToBeta()
if reqerr != nil {
return nil, reqerr
}
klog.V(3).Infof("Listing beta zonal {{.Name}} %v", key.Name)
gceObjs, err = gceCloud.Compute().Beta{{.GetCloudProviderName}}().{{.GetGroupResourceInfo.ListFuncName}}(ctx, key, betareq, filter.None)
default:
gareq, err := req.ToGA()
if err != nil {
return nil, err
gareq, reqerr := req.ToGA()
if reqerr != nil {
return nil, reqerr
}
klog.V(3).Infof("Listing ga zonal {{.Name}} %v", key.Name)
gceObjs, err = gceCloud.Compute().{{.GetCloudProviderName}}().{{.GetGroupResourceInfo.ListFuncName}}(ctx, key, gareq, filter.None)
Expand All @@ -716,43 +716,66 @@ func {{.GetGroupResourceInfo.ListFuncName}}(gceCloud *gce.Cloud, key *meta.Key,
return compositeObjs, nil
}
func {{.GetGroupResourceInfo.AggListFuncName}}{{.GetGroupResourceInfo.AggListRespName}}(gceCloud *gce.Cloud, version meta.Version) (map[string][]*{{.GetGroupResourceInfo.AggListRespName}}, error) {
func {{.GetGroupResourceInfo.AggListFuncName}}{{.GetGroupResourceInfo.AggListRespName}}(gceCloud *gce.Cloud, version meta.Version) (map[*meta.Key]*{{.GetGroupResourceInfo.AggListRespName}}, error) {
ctx, cancel := cloudprovider.ContextWithCallTimeout()
defer cancel()
mc := compositemetrics.NewMetricContext("{{.Name}}", "aggregateList", "", "", string(version))
compositeMap := make(map[*meta.Key]*{{.GetGroupResourceInfo.AggListRespName}})
var gceObjs interface{}
compositeMap := make(map[string][]*{{.GetGroupResourceInfo.AggListRespName}})
var err error
switch version {
case meta.VersionAlpha:
klog.V(3).Infof("Aggregate List of alpha zonal {{.Name}}")
gceObjs, err = gceCloud.Compute().Alpha{{.GetCloudProviderName}}().{{.GetGroupResourceInfo.AggListFuncName}}(ctx, filter.None)
alphaMap, err := gceCloud.Compute().Alpha{{.GetCloudProviderName}}().{{.GetGroupResourceInfo.AggListFuncName}}(ctx, filter.None)
if err != nil {
return nil, mc.Observe(err)
}
// Convert from map to list
alphaList := []*computealpha.{{.GetGroupResourceInfo.AggListRespName}}{}
for _, val := range alphaMap {
alphaList = append(alphaList, val...)
}
gceObjs = alphaList
case meta.VersionBeta:
klog.V(3).Infof("Aggregate List of beta zonal {{.Name}}")
gceObjs, err = gceCloud.Compute().Beta{{.GetCloudProviderName}}().{{.GetGroupResourceInfo.AggListFuncName}}(ctx, filter.None)
betaMap, err := gceCloud.Compute().Beta{{.GetCloudProviderName}}().{{.GetGroupResourceInfo.AggListFuncName}}(ctx, filter.None)
if err != nil {
return nil, mc.Observe(err)
}
// Convert from map to list
betaList := []*computebeta.{{.GetGroupResourceInfo.AggListRespName}}{}
for _, val := range betaMap {
betaList = append(betaList, val...)
}
gceObjs = betaList
default:
klog.V(3).Infof("Aggregate List of ga zonal {{.Name}}")
gceObjs, err = gceCloud.Compute().{{.GetCloudProviderName}}().{{.GetGroupResourceInfo.AggListFuncName}}(ctx, filter.None)
gaMap, err := gceCloud.Compute().{{.GetCloudProviderName}}().{{.GetGroupResourceInfo.AggListFuncName}}(ctx, filter.None)
if err != nil {
return nil, mc.Observe(err)
}
// Convert from map to list
gaList := []*compute.{{.GetGroupResourceInfo.AggListRespName}}{}
for _, val := range gaMap {
gaList = append(gaList, val...)
}
gceObjs = gaList
}
compositeObjs, err := To{{.GetGroupResourceInfo.AggListRespName}}List(gceObjs)
if err != nil {
return nil, mc.Observe(err)
}
gceMap, ok := gceObjs.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("Failed to convert gceObj to map[string]interface{}")
return nil, err
}
for keyStr, obj := range gceMap {
compositeObjs, err := To{{.GetGroupResourceInfo.AggListRespName}}List(obj)
if err != nil {
return nil, err
}
for _, o := range compositeObjs {
o.Version = version
for _, obj := range compositeObjs {
obj.Version = version
resourceID, err := cloudprovider.ParseResourceURL(obj.SelfLink)
if err != nil || resourceID == nil || resourceID.Key == nil {
klog.Errorf("Failed to parse SelfLink - %s for obj %v, err %v", obj.SelfLink, obj, err)
continue
}
compositeMap[keyStr] = compositeObjs
compositeMap[resourceID.Key] = obj
}
return compositeMap, nil
}
{{end}} {{/*IsGroupResourceService*/}}
Expand Down
7 changes: 4 additions & 3 deletions pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package neg

import (
"fmt"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
"sync"

"k8s.io/api/core/v1"
Expand Down Expand Up @@ -254,7 +255,7 @@ func (manager *syncerManager) garbageCollectSyncer() {
func (manager *syncerManager) garbageCollectNEG() error {
// Retrieve aggregated NEG list from cloud
// Compare against svcPortMap and Remove unintended NEGs by best effort
zoneNEGList, err := manager.cloud.AggregatedListNetworkEndpointGroup()
zoneNEGList, err := manager.cloud.AggregatedListNetworkEndpointGroup(meta.VersionGA)
if err != nil {
return fmt.Errorf("failed to retrieve aggregated NEG list: %v", err)
}
Expand Down Expand Up @@ -294,13 +295,13 @@ func (manager *syncerManager) garbageCollectNEG() error {

// ensureDeleteNetworkEndpointGroup ensures neg is delete from zone
func (manager *syncerManager) ensureDeleteNetworkEndpointGroup(name, zone string) error {
_, err := manager.cloud.GetNetworkEndpointGroup(name, zone)
_, err := manager.cloud.GetNetworkEndpointGroup(name, zone, meta.VersionGA)
if err != nil {
// Assume error is caused by not existing
return nil
}
klog.V(2).Infof("Deleting NEG %q in %q.", name, zone)
return manager.cloud.DeleteNetworkEndpointGroup(name, zone)
return manager.cloud.DeleteNetworkEndpointGroup(name, zone, meta.VersionGA)
}

// getSyncerKey encodes a service namespace, name, service port and targetPort into a string key
Expand Down
9 changes: 6 additions & 3 deletions pkg/neg/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"testing"
"time"

"google.golang.org/api/compute/v1"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
"k8s.io/ingress-gce/pkg/composite"

apiv1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -349,7 +351,8 @@ func TestGarbageCollectionNEG(t *testing.T) {

for _, networkEndpointType := range []negtypes.NetworkEndpointType{negtypes.VmIpPortEndpointType, negtypes.NonGCPPrivateEndpointType} {
negName := manager.namer.NEG("test", "test", 80)
manager.cloud.CreateNetworkEndpointGroup(&compute.NetworkEndpointGroup{
manager.cloud.CreateNetworkEndpointGroup(&composite.NetworkEndpointGroup{
Version: meta.VersionGA,
Name: negName,
NetworkEndpointType: string(networkEndpointType),
}, negtypes.TestZone1)
Expand All @@ -358,7 +361,7 @@ func TestGarbageCollectionNEG(t *testing.T) {
t.Fatalf("Failed to GC: %v", err)
}

negs, _ := manager.cloud.ListNetworkEndpointGroup(negtypes.TestZone1)
negs, _ := manager.cloud.ListNetworkEndpointGroup(negtypes.TestZone1, meta.VersionGA)
for _, neg := range negs {
if neg.Name == negName {
t.Errorf("Expect NEG %q to be GCed.", negName)
Expand Down
Loading

0 comments on commit 011cb28

Please sign in to comment.