From 93022bbd71a3e0589ea620efadff8eb9eb065481 Mon Sep 17 00:00:00 2001 From: Pavithra Ramesh Date: Wed, 30 Oct 2019 23:54:02 -0700 Subject: [PATCH 1/3] Modify NEG libraries to use composite types Modified syncer_test to use only one v1 import. --- pkg/backends/interfaces.go | 3 +- pkg/backends/neg_linker.go | 7 +- pkg/backends/neg_linker_test.go | 8 +- pkg/neg/manager.go | 7 +- pkg/neg/manager_test.go | 9 ++- pkg/neg/readiness/poller.go | 6 +- pkg/neg/readiness/poller_test.go | 13 +++- pkg/neg/syncers/transaction.go | 20 ++--- pkg/neg/syncers/transaction_test.go | 15 ++-- pkg/neg/syncers/utils.go | 22 +++--- pkg/neg/syncers/utils_test.go | 83 ++++++++++++--------- pkg/neg/types/cloudprovideradapter.go | 87 ++++++++-------------- pkg/neg/types/cloudprovideradapter_test.go | 15 ++-- pkg/neg/types/fakes.go | 38 +++++----- pkg/neg/types/interfaces.go | 19 ++--- pkg/neg/types/mock.go | 5 +- pkg/neg/types/types.go | 14 ++++ pkg/utils/serviceport.go | 12 +++ 18 files changed, 203 insertions(+), 180 deletions(-) diff --git a/pkg/backends/interfaces.go b/pkg/backends/interfaces.go index 9aad991e24..c85d59383f 100644 --- a/pkg/backends/interfaces.go +++ b/pkg/backends/interfaces.go @@ -18,7 +18,6 @@ package backends import ( "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" - compute "google.golang.org/api/compute/v1" api_v1 "k8s.io/api/core/v1" "k8s.io/ingress-gce/pkg/composite" "k8s.io/ingress-gce/pkg/utils" @@ -71,7 +70,7 @@ type Linker interface { // NEGGetter is an interface to retrieve NEG object type NEGGetter interface { - GetNetworkEndpointGroup(name string, zone string) (*compute.NetworkEndpointGroup, error) + GetNetworkEndpointGroup(name string, zone string, version meta.Version) (*composite.NetworkEndpointGroup, error) } // ProbeProvider retrieves a probe struct given a nodePort diff --git a/pkg/backends/neg_linker.go b/pkg/backends/neg_linker.go index b5218f1e3c..788d4c1893 100644 --- a/pkg/backends/neg_linker.go +++ b/pkg/backends/neg_linker.go @@ -14,7 +14,6 @@ limitations under the License. package backends import ( - "google.golang.org/api/compute/v1" "k8s.io/apimachinery/pkg/util/sets" befeatures "k8s.io/ingress-gce/pkg/backends/features" "k8s.io/ingress-gce/pkg/composite" @@ -45,7 +44,7 @@ func NewNEGLinker( // Link implements Link. func (l *negLinker) Link(sp utils.ServicePort, groups []GroupKey) error { - var negs []*compute.NetworkEndpointGroup + var negs []*composite.NetworkEndpointGroup var err error for _, group := range groups { // If the group key contains a name, then use that. @@ -54,7 +53,7 @@ func (l *negLinker) Link(sp utils.ServicePort, groups []GroupKey) error { if negName == "" { negName = sp.BackendName() } - neg, err := l.negGetter.GetNetworkEndpointGroup(negName, group.Zone) + neg, err := l.negGetter.GetNetworkEndpointGroup(negName, group.Zone, utils.GetAPIVersionFromServicePort(&sp)) if err != nil { return err } @@ -95,7 +94,7 @@ func (l *negLinker) Link(sp utils.ServicePort, groups []GroupKey) error { return nil } -func getBackendsForNEGs(negs []*compute.NetworkEndpointGroup) []*composite.Backend { +func getBackendsForNEGs(negs []*composite.NetworkEndpointGroup) []*composite.Backend { var backends []*composite.Backend for _, neg := range negs { b := &composite.Backend{ diff --git a/pkg/backends/neg_linker_test.go b/pkg/backends/neg_linker_test.go index 96a822941d..98d08dd90f 100644 --- a/pkg/backends/neg_linker_test.go +++ b/pkg/backends/neg_linker_test.go @@ -14,12 +14,13 @@ limitations under the License. package backends import ( + "k8s.io/ingress-gce/pkg/composite" "strings" "testing" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/mock" - compute "google.golang.org/api/compute/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/ingress-gce/pkg/annotations" negtypes "k8s.io/ingress-gce/pkg/neg/types" @@ -65,8 +66,9 @@ func TestLinkBackendServiceToNEG(t *testing.T) { linker.backendPool.Create(svcPort, "fake-healthcheck-link") for _, key := range zones { - err := fakeNEG.CreateNetworkEndpointGroup(&compute.NetworkEndpointGroup{ - Name: defaultNamer.NEG(namespace, name, svcPort.Port), + err := fakeNEG.CreateNetworkEndpointGroup(&composite.NetworkEndpointGroup{ + Name: defaultNamer.NEG(namespace, name, svcPort.Port), + Version: meta.VersionGA, }, key.Zone) if err != nil { t.Fatalf("unexpected error: %v", err) diff --git a/pkg/neg/manager.go b/pkg/neg/manager.go index 3d36d51cc4..621a6a904b 100644 --- a/pkg/neg/manager.go +++ b/pkg/neg/manager.go @@ -18,6 +18,7 @@ package neg import ( "fmt" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" "sync" "k8s.io/api/core/v1" @@ -254,7 +255,7 @@ func (manager *syncerManager) garbageCollectSyncer() { func (manager *syncerManager) garbageCollectNEG() error { // Retrieve aggregated NEG list from cloud // Compare against svcPortMap and Remove unintended NEGs by best effort - zoneNEGList, err := manager.cloud.AggregatedListNetworkEndpointGroup() + zoneNEGList, err := manager.cloud.AggregatedListNetworkEndpointGroup(meta.VersionGA) if err != nil { return fmt.Errorf("failed to retrieve aggregated NEG list: %v", err) } @@ -294,13 +295,13 @@ func (manager *syncerManager) garbageCollectNEG() error { // ensureDeleteNetworkEndpointGroup ensures neg is delete from zone func (manager *syncerManager) ensureDeleteNetworkEndpointGroup(name, zone string) error { - _, err := manager.cloud.GetNetworkEndpointGroup(name, zone) + _, err := manager.cloud.GetNetworkEndpointGroup(name, zone, meta.VersionGA) if err != nil { // Assume error is caused by not existing return nil } klog.V(2).Infof("Deleting NEG %q in %q.", name, zone) - return manager.cloud.DeleteNetworkEndpointGroup(name, zone) + return manager.cloud.DeleteNetworkEndpointGroup(name, zone, meta.VersionGA) } // getSyncerKey encodes a service namespace, name, service port and targetPort into a string key diff --git a/pkg/neg/manager_test.go b/pkg/neg/manager_test.go index 0becb695a3..846b8414a5 100644 --- a/pkg/neg/manager_test.go +++ b/pkg/neg/manager_test.go @@ -21,7 +21,9 @@ import ( "testing" "time" - "google.golang.org/api/compute/v1" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" + "k8s.io/ingress-gce/pkg/composite" + apiv1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -349,7 +351,8 @@ func TestGarbageCollectionNEG(t *testing.T) { for _, networkEndpointType := range []negtypes.NetworkEndpointType{negtypes.VmIpPortEndpointType, negtypes.NonGCPPrivateEndpointType} { negName := manager.namer.NEG("test", "test", 80) - manager.cloud.CreateNetworkEndpointGroup(&compute.NetworkEndpointGroup{ + manager.cloud.CreateNetworkEndpointGroup(&composite.NetworkEndpointGroup{ + Version: meta.VersionGA, Name: negName, NetworkEndpointType: string(networkEndpointType), }, negtypes.TestZone1) @@ -358,7 +361,7 @@ func TestGarbageCollectionNEG(t *testing.T) { t.Fatalf("Failed to GC: %v", err) } - negs, _ := manager.cloud.ListNetworkEndpointGroup(negtypes.TestZone1) + negs, _ := manager.cloud.ListNetworkEndpointGroup(negtypes.TestZone1, meta.VersionGA) for _, neg := range negs { if neg.Name == negName { t.Errorf("Expect NEG %q to be GCed.", negName) diff --git a/pkg/neg/readiness/poller.go b/pkg/neg/readiness/poller.go index f0c5c7647f..a50d9c1dd4 100644 --- a/pkg/neg/readiness/poller.go +++ b/pkg/neg/readiness/poller.go @@ -18,10 +18,10 @@ package readiness import ( "fmt" - compute "google.golang.org/api/compute/v1" "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/tools/cache" + "k8s.io/ingress-gce/pkg/composite" negtypes "k8s.io/ingress-gce/pkg/neg/types" "k8s.io/klog" "strconv" @@ -139,7 +139,7 @@ func (p *poller) Poll(key negMeta) (retry bool, err error) { var errList []error klog.V(2).Infof("polling NEG %q in zone %q", key.Name, key.Zone) // TODO(freehan): filter the NEs that are in interest once the API supports it - res, err := p.negCloud.ListNetworkEndpoints(key.Name, key.Zone /*showHealthStatus*/, true) + res, err := p.negCloud.ListNetworkEndpoints(key.Name, key.Zone /*showHealthStatus*/, true, key.SyncerKey.GetAPIVersion()) if err != nil { return true, err } @@ -167,7 +167,7 @@ func (p *poller) Poll(key negMeta) (retry bool, err error) { // processHealthStatus evaluates the health status of the input network endpoint. // Assumes p.lock is held when calling this method. -func (p *poller) processHealthStatus(key negMeta, healthStatus *compute.NetworkEndpointWithHealthStatus) (healthy bool, err error) { +func (p *poller) processHealthStatus(key negMeta, healthStatus *composite.NetworkEndpointWithHealthStatus) (healthy bool, err error) { ne := negtypes.NetworkEndpoint{ IP: healthStatus.NetworkEndpoint.IpAddress, Port: strconv.FormatInt(healthStatus.NetworkEndpoint.Port, 10), diff --git a/pkg/neg/readiness/poller_test.go b/pkg/neg/readiness/poller_test.go index 1ab3e95019..0af3b43cb4 100644 --- a/pkg/neg/readiness/poller_test.go +++ b/pkg/neg/readiness/poller_test.go @@ -17,6 +17,7 @@ limitations under the License. package readiness import ( + "k8s.io/ingress-gce/pkg/composite" "net" "strconv" "testing" @@ -357,7 +358,7 @@ func TestPoll(t *testing.T) { } // create NEG, but with no endpoint - negCloud.CreateNetworkEndpointGroup(&compute.NetworkEndpointGroup{Name: negName, Zone: zone}, zone) + negCloud.CreateNetworkEndpointGroup(&composite.NetworkEndpointGroup{Name: negName, Zone: zone, Version: meta.VersionGA}, zone) retry, err = poller.Poll(key) if err != nil { t.Errorf("Does not expect err, but got %v", err) @@ -367,12 +368,12 @@ func TestPoll(t *testing.T) { } // add NE to the NEG, but NE not healthy - ne := &compute.NetworkEndpoint{ + ne := &composite.NetworkEndpoint{ IpAddress: ip, Port: port, Instance: instance, } - negCloud.AttachNetworkEndpoints(negName, zone, []*compute.NetworkEndpoint{ne}) + negCloud.AttachNetworkEndpoints(negName, zone, []*composite.NetworkEndpoint{ne}, meta.VersionGA) retry, err = poller.Poll(key) if err != nil { t.Errorf("Does not expect err, but got %v", err) @@ -383,7 +384,11 @@ func TestPoll(t *testing.T) { // add NE with healthy status negtypes.GetNetworkEndpointStore(negCloud).AddNetworkEndpointHealthStatus(*meta.ZonalKey(negName, zone), negtypes.NetworkEndpointEntry{ - NetworkEndpoint: ne, + NetworkEndpoint: &compute.NetworkEndpoint{ + IpAddress: ip, + Port: port, + Instance: instance, + }, Healths: []*compute.HealthStatusForNetworkEndpoint{ { BackendService: &compute.BackendServiceReference{ diff --git a/pkg/neg/syncers/transaction.go b/pkg/neg/syncers/transaction.go index 2b3fd01eee..32f70acf6c 100644 --- a/pkg/neg/syncers/transaction.go +++ b/pkg/neg/syncers/transaction.go @@ -21,12 +21,12 @@ import ( "fmt" - "google.golang.org/api/compute/v1" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/ingress-gce/pkg/composite" "k8s.io/ingress-gce/pkg/neg/readiness" negtypes "k8s.io/ingress-gce/pkg/neg/types" "k8s.io/klog" @@ -135,7 +135,7 @@ func (s *transactionSyncer) syncInternal() error { return err } - currentMap, err := retrieveExistingZoneNetworkEndpointMap(s.negName, s.zoneGetter, s.cloud) + currentMap, err := retrieveExistingZoneNetworkEndpointMap(s.negName, s.zoneGetter, s.cloud, s.NegSyncerKey.GetAPIVersion()) if err != nil { return err } @@ -176,7 +176,7 @@ func (s *transactionSyncer) ensureNetworkEndpointGroups() error { var errList []error for _, zone := range zones { - if err := ensureNetworkEndpointGroup(s.Namespace, s.Name, s.negName, zone, s.NegSyncerKey.String(), s.NegSyncerKey.NegType, s.cloud, s.serviceLister, s.recorder); err != nil { + if err := ensureNetworkEndpointGroup(s.Namespace, s.Name, s.negName, zone, s.NegSyncerKey.String(), s.NegSyncerKey.NegType, s.cloud, s.serviceLister, s.recorder, s.NegSyncerKey.GetAPIVersion()); err != nil { errList = append(errList, err) } } @@ -228,13 +228,13 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m } // attachNetworkEndpoints creates go routine to run operations for attaching network endpoints -func (s *transactionSyncer) attachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*compute.NetworkEndpoint) { +func (s *transactionSyncer) attachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) { klog.V(2).Infof("Attaching %d endpoint(s) for %s in NEG %s at %s.", len(networkEndpointMap), s.NegSyncerKey.String(), s.negName, zone) go s.operationInternal(attachOp, zone, networkEndpointMap) } // detachNetworkEndpoints creates go routine to run operations for detaching network endpoints -func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*compute.NetworkEndpoint) { +func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) { klog.V(2).Infof("Detaching %d endpoint(s) for %s in NEG %s at %s.", len(networkEndpointMap), s.NegSyncerKey.String(), s.negName, zone) go s.operationInternal(detachOp, zone, networkEndpointMap) } @@ -242,18 +242,18 @@ func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointM // operationInternal executes NEG API call and commits the transactions // It will record events when operations are completed // If error occurs or any transaction entry requires reconciliation, it will trigger resync -func (s *transactionSyncer) operationInternal(operation transactionOp, zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*compute.NetworkEndpoint) { +func (s *transactionSyncer) operationInternal(operation transactionOp, zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) { var err error - networkEndpoints := []*compute.NetworkEndpoint{} + networkEndpoints := []*composite.NetworkEndpoint{} for _, ne := range networkEndpointMap { networkEndpoints = append(networkEndpoints, ne) } if operation == attachOp { - err = s.cloud.AttachNetworkEndpoints(s.negName, zone, networkEndpoints) + err = s.cloud.AttachNetworkEndpoints(s.negName, zone, networkEndpoints, s.NegSyncerKey.GetAPIVersion()) } if operation == detachOp { - err = s.cloud.DetachNetworkEndpoints(s.negName, zone, networkEndpoints) + err = s.cloud.DetachNetworkEndpoints(s.negName, zone, networkEndpoints, s.NegSyncerKey.GetAPIVersion()) } if err == nil { @@ -276,7 +276,7 @@ func (s *transactionSyncer) recordEvent(eventType, reason, eventDesc string) { // It will trigger syncer retry in the following conditions: // 1. Any of the transaction committed needed to be reconciled // 2. Input error was not nil -func (s *transactionSyncer) commitTransaction(err error, networkEndpointMap map[negtypes.NetworkEndpoint]*compute.NetworkEndpoint) { +func (s *transactionSyncer) commitTransaction(err error, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) { s.syncLock.Lock() defer s.syncLock.Unlock() diff --git a/pkg/neg/syncers/transaction_test.go b/pkg/neg/syncers/transaction_test.go index 2170c653d1..c3d9b3ca8c 100644 --- a/pkg/neg/syncers/transaction_test.go +++ b/pkg/neg/syncers/transaction_test.go @@ -24,7 +24,7 @@ import ( "testing" "time" - "google.golang.org/api/compute/v1" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" @@ -32,6 +32,7 @@ import ( "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/record" backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned/fake" + "k8s.io/ingress-gce/pkg/composite" "k8s.io/ingress-gce/pkg/context" "k8s.io/ingress-gce/pkg/neg/readiness" negtypes "k8s.io/ingress-gce/pkg/neg/types" @@ -148,7 +149,7 @@ func TestTransactionSyncNetworkEndpoints(t *testing.T) { } // Verify the NEGs are created as expected - ret, _ := transactionSyncer.cloud.AggregatedListNetworkEndpointGroup() + ret, _ := transactionSyncer.cloud.AggregatedListNetworkEndpointGroup(meta.VersionGA) expectZones := []string{testZone1, testZone2} for _, zone := range expectZones { negs, ok := ret[zone] @@ -177,7 +178,7 @@ func TestTransactionSyncNetworkEndpoints(t *testing.T) { } for zone, endpoints := range tc.expectEndpoints { - list, err := fakeCloud.ListNetworkEndpoints(transactionSyncer.negName, zone, false) + list, err := fakeCloud.ListNetworkEndpoints(transactionSyncer.negName, zone, false, meta.VersionGA) if err != nil { t.Errorf("For case %q,, endpointSets error == nil, but got %v", tc.desc, err) } @@ -208,7 +209,7 @@ func TestCommitTransaction(t *testing.T) { testCases := []struct { desc string err error - endpointMap map[negtypes.NetworkEndpoint]*compute.NetworkEndpoint + endpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint table func() networkEndpointTransactionTable expect func() networkEndpointTransactionTable expectSyncCount int @@ -218,7 +219,7 @@ func TestCommitTransaction(t *testing.T) { { "empty inputs", nil, - map[negtypes.NetworkEndpoint]*compute.NetworkEndpoint{}, + map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint{}, func() networkEndpointTransactionTable { return NewTransactionTable() }, func() networkEndpointTransactionTable { return NewTransactionTable() }, 1, @@ -277,7 +278,7 @@ func TestCommitTransaction(t *testing.T) { { "error and retry", fmt.Errorf("dummy error"), - map[negtypes.NetworkEndpoint]*compute.NetworkEndpoint{}, + map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint{}, func() networkEndpointTransactionTable { table := NewTransactionTable() generateTransaction(table, transactionEntry{Zone: testZone2, Operation: attachOp}, net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") @@ -894,7 +895,7 @@ func unionEndpointMap(m1, m2 negtypes.EndpointPodMap) negtypes.EndpointPodMap { return m1 } -func generateEndpointBatch(endpointSet negtypes.NetworkEndpointSet) map[negtypes.NetworkEndpoint]*compute.NetworkEndpoint { +func generateEndpointBatch(endpointSet negtypes.NetworkEndpointSet) map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint { ret, _ := makeEndpointBatch(endpointSet) return ret } diff --git a/pkg/neg/syncers/utils.go b/pkg/neg/syncers/utils.go index 5782fa919f..547c76b81d 100644 --- a/pkg/neg/syncers/utils.go +++ b/pkg/neg/syncers/utils.go @@ -18,11 +18,12 @@ package syncers import ( "fmt" + "k8s.io/ingress-gce/pkg/composite" "strconv" "strings" "time" - "google.golang.org/api/compute/v1" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" apiv1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" @@ -112,8 +113,8 @@ func getService(serviceLister cache.Indexer, namespace, name string) *apiv1.Serv } // ensureNetworkEndpointGroup ensures corresponding NEG is configured correctly in the specified zone. -func ensureNetworkEndpointGroup(svcNamespace, svcName, negName, zone, negServicePortName string, networkEndpointType negtypes.NetworkEndpointType, cloud negtypes.NetworkEndpointGroupCloud, serviceLister cache.Indexer, recorder record.EventRecorder) error { - neg, err := cloud.GetNetworkEndpointGroup(negName, zone) +func ensureNetworkEndpointGroup(svcNamespace, svcName, negName, zone, negServicePortName string, networkEndpointType negtypes.NetworkEndpointType, cloud negtypes.NetworkEndpointGroupCloud, serviceLister cache.Indexer, recorder record.EventRecorder, version meta.Version) error { + neg, err := cloud.GetNetworkEndpointGroup(negName, zone, version) if err != nil { // Most likely to be caused by non-existed NEG klog.V(4).Infof("Error while retriving %q in zone %q: %v", negName, zone, err) @@ -129,7 +130,7 @@ func ensureNetworkEndpointGroup(svcNamespace, svcName, negName, zone, negService !utils.EqualResourceIDs(neg.Subnetwork, cloud.SubnetworkURL())) { needToCreate = true klog.V(2).Infof("NEG %q in %q does not match network and subnetwork of the cluster. Deleting NEG.", negName, zone) - err = cloud.DeleteNetworkEndpointGroup(negName, zone) + err = cloud.DeleteNetworkEndpointGroup(negName, zone, version) if err != nil { return err } @@ -149,7 +150,8 @@ func ensureNetworkEndpointGroup(svcNamespace, svcName, negName, zone, negService default: subnetwork = cloud.SubnetworkURL() } - err = cloud.CreateNetworkEndpointGroup(&compute.NetworkEndpointGroup{ + err = cloud.CreateNetworkEndpointGroup(&composite.NetworkEndpointGroup{ + Version: version, Name: negName, NetworkEndpointType: string(networkEndpointType), Network: cloud.NetworkURL(), @@ -246,7 +248,7 @@ func toZoneNetworkEndpointMap(endpoints *apiv1.Endpoints, zoneGetter negtypes.Zo } // retrieveExistingZoneNetworkEndpointMap lists existing network endpoints in the neg and return the zone and endpoints map -func retrieveExistingZoneNetworkEndpointMap(negName string, zoneGetter negtypes.ZoneGetter, cloud negtypes.NetworkEndpointGroupCloud) (map[string]negtypes.NetworkEndpointSet, error) { +func retrieveExistingZoneNetworkEndpointMap(negName string, zoneGetter negtypes.ZoneGetter, cloud negtypes.NetworkEndpointGroupCloud, version meta.Version) (map[string]negtypes.NetworkEndpointSet, error) { zones, err := zoneGetter.ListZones() if err != nil { return nil, err @@ -255,7 +257,7 @@ func retrieveExistingZoneNetworkEndpointMap(negName string, zoneGetter negtypes. zoneNetworkEndpointMap := map[string]negtypes.NetworkEndpointSet{} for _, zone := range zones { zoneNetworkEndpointMap[zone] = negtypes.NewNetworkEndpointSet() - networkEndpointsWithHealthStatus, err := cloud.ListNetworkEndpoints(negName, zone, false) + networkEndpointsWithHealthStatus, err := cloud.ListNetworkEndpoints(negName, zone, false, version) if err != nil { return nil, err } @@ -268,8 +270,8 @@ func retrieveExistingZoneNetworkEndpointMap(negName string, zoneGetter negtypes. // makeEndpointBatch return a batch of endpoint from the input and remove the endpoints from input set // The return map has the encoded endpoint as key and GCE network endpoint object as value -func makeEndpointBatch(endpoints negtypes.NetworkEndpointSet) (map[negtypes.NetworkEndpoint]*compute.NetworkEndpoint, error) { - endpointBatch := map[negtypes.NetworkEndpoint]*compute.NetworkEndpoint{} +func makeEndpointBatch(endpoints negtypes.NetworkEndpointSet) (map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, error) { + endpointBatch := map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint{} for i := 0; i < MAX_NETWORK_ENDPOINTS_PER_BATCH; i++ { networkEndpoint, ok := endpoints.PopAny() @@ -282,7 +284,7 @@ func makeEndpointBatch(endpoints negtypes.NetworkEndpointSet) (map[negtypes.Netw return nil, fmt.Errorf("failed to decode endpoint port %v: %v", networkEndpoint, err) } - endpointBatch[networkEndpoint] = &compute.NetworkEndpoint{ + endpointBatch[networkEndpoint] = &composite.NetworkEndpoint{ Instance: networkEndpoint.Node, IpAddress: networkEndpoint.IP, Port: int64(portNum), diff --git a/pkg/neg/syncers/utils_test.go b/pkg/neg/syncers/utils_test.go index 5df4a14329..038a4d129b 100644 --- a/pkg/neg/syncers/utils_test.go +++ b/pkg/neg/syncers/utils_test.go @@ -17,20 +17,18 @@ limitations under the License. package syncers import ( + "fmt" "reflect" "strconv" "testing" - "fmt" - "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" - "google.golang.org/api/compute/v1" - "k8s.io/api/core/v1" - apiv1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/ingress-gce/pkg/composite" negtypes "k8s.io/ingress-gce/pkg/neg/types" "k8s.io/legacy-cloud-providers/gce" ) @@ -313,6 +311,7 @@ func TestEnsureNetworkEndpointGroup(t *testing.T) { enableNonGCPMode bool networkEndpointType negtypes.NetworkEndpointType expectedSubnetwork string + apiVersion meta.Version }{ { description: "Create NEG of type GCE_VM_IP_PORT", @@ -320,6 +319,7 @@ func TestEnsureNetworkEndpointGroup(t *testing.T) { enableNonGCPMode: false, networkEndpointType: negtypes.VmIpPortEndpointType, expectedSubnetwork: testSubnetwork, + apiVersion: meta.VersionGA, }, { description: "Create NEG of type NON_GCP_PRIVATE_IP_PORT", @@ -327,6 +327,15 @@ func TestEnsureNetworkEndpointGroup(t *testing.T) { enableNonGCPMode: true, networkEndpointType: negtypes.NonGCPPrivateEndpointType, expectedSubnetwork: "", + apiVersion: meta.VersionGA, + }, + { + description: "Create NEG of type GCE_VM_PRIMARY_IP", + negName: "gcp-ip-neg", + enableNonGCPMode: false, + networkEndpointType: negtypes.VmPrimaryIpEndpointType, + expectedSubnetwork: testSubnetwork, + apiVersion: meta.VersionAlpha, }, } for _, tc := range testCases { @@ -340,9 +349,10 @@ func TestEnsureNetworkEndpointGroup(t *testing.T) { fakeCloud, nil, nil, + tc.apiVersion, ) - neg, err := fakeCloud.GetNetworkEndpointGroup(tc.negName, testZone) + neg, err := fakeCloud.GetNetworkEndpointGroup(tc.negName, testZone, tc.apiVersion) if err != nil { t.Errorf("Failed to retrieve NEG %q: %v", tc.negName, err) } @@ -366,6 +376,7 @@ func TestEnsureNetworkEndpointGroup(t *testing.T) { fakeCloud, nil, nil, + tc.apiVersion, ) if err != nil { @@ -526,21 +537,21 @@ func TestRetrieveExistingZoneNetworkEndpointMap(t *testing.T) { { desc: "neg only exists in one of the zone", mutate: func(cloud negtypes.NetworkEndpointGroupCloud) { - cloud.CreateNetworkEndpointGroup(&compute.NetworkEndpointGroup{Name: testNegName}, negtypes.TestZone1) + cloud.CreateNetworkEndpointGroup(&composite.NetworkEndpointGroup{Name: testNegName, Version: meta.VersionGA}, negtypes.TestZone1) }, expectErr: true, }, { desc: "neg only exists in one of the zone plus irrelevant negs", mutate: func(cloud negtypes.NetworkEndpointGroupCloud) { - cloud.CreateNetworkEndpointGroup(&compute.NetworkEndpointGroup{Name: irrelevantNegName}, negtypes.TestZone2) + cloud.CreateNetworkEndpointGroup(&composite.NetworkEndpointGroup{Name: irrelevantNegName, Version: meta.VersionGA}, negtypes.TestZone2) }, expectErr: true, }, { desc: "empty negs exists in both zones", mutate: func(cloud negtypes.NetworkEndpointGroupCloud) { - cloud.CreateNetworkEndpointGroup(&compute.NetworkEndpointGroup{Name: testNegName}, negtypes.TestZone2) + cloud.CreateNetworkEndpointGroup(&composite.NetworkEndpointGroup{Name: testNegName, Version: meta.VersionGA}, negtypes.TestZone2) }, expect: map[string]negtypes.NetworkEndpointSet{ negtypes.TestZone1: negtypes.NewNetworkEndpointSet(), @@ -551,13 +562,13 @@ func TestRetrieveExistingZoneNetworkEndpointMap(t *testing.T) { { desc: "one empty and one non-empty negs", mutate: func(cloud negtypes.NetworkEndpointGroupCloud) { - cloud.AttachNetworkEndpoints(testNegName, negtypes.TestZone1, []*compute.NetworkEndpoint{ + cloud.AttachNetworkEndpoints(testNegName, negtypes.TestZone1, []*composite.NetworkEndpoint{ { Instance: negtypes.TestInstance1, IpAddress: testIP1, Port: testPort, }, - }) + }, meta.VersionGA) }, expect: map[string]negtypes.NetworkEndpointSet{ negtypes.TestZone1: negtypes.NewNetworkEndpointSet(negtypes.NetworkEndpoint{IP: testIP1, Node: negtypes.TestInstance1, Port: strconv.Itoa(int(testPort))}), @@ -568,13 +579,13 @@ func TestRetrieveExistingZoneNetworkEndpointMap(t *testing.T) { { desc: "one neg with multiple endpoints", mutate: func(cloud negtypes.NetworkEndpointGroupCloud) { - cloud.AttachNetworkEndpoints(testNegName, negtypes.TestZone1, []*compute.NetworkEndpoint{ + cloud.AttachNetworkEndpoints(testNegName, negtypes.TestZone1, []*composite.NetworkEndpoint{ { Instance: negtypes.TestInstance2, IpAddress: testIP2, Port: testPort, }, - }) + }, meta.VersionGA) }, expect: map[string]negtypes.NetworkEndpointSet{ negtypes.TestZone1: negtypes.NewNetworkEndpointSet( @@ -588,7 +599,7 @@ func TestRetrieveExistingZoneNetworkEndpointMap(t *testing.T) { { desc: "both negs with multiple endpoints", mutate: func(cloud negtypes.NetworkEndpointGroupCloud) { - cloud.AttachNetworkEndpoints(testNegName, negtypes.TestZone2, []*compute.NetworkEndpoint{ + cloud.AttachNetworkEndpoints(testNegName, negtypes.TestZone2, []*composite.NetworkEndpoint{ { Instance: negtypes.TestInstance3, IpAddress: testIP3, @@ -599,7 +610,7 @@ func TestRetrieveExistingZoneNetworkEndpointMap(t *testing.T) { IpAddress: testIP4, Port: testPort, }, - }) + }, meta.VersionGA) }, expect: map[string]negtypes.NetworkEndpointSet{ negtypes.TestZone1: negtypes.NewNetworkEndpointSet( @@ -616,13 +627,13 @@ func TestRetrieveExistingZoneNetworkEndpointMap(t *testing.T) { { desc: "irrelevant neg", mutate: func(cloud negtypes.NetworkEndpointGroupCloud) { - cloud.AttachNetworkEndpoints(irrelevantNegName, negtypes.TestZone2, []*compute.NetworkEndpoint{ + cloud.AttachNetworkEndpoints(irrelevantNegName, negtypes.TestZone2, []*composite.NetworkEndpoint{ { Instance: negtypes.TestInstance3, IpAddress: testIP4, Port: testPort, }, - }) + }, meta.VersionGA) }, expect: map[string]negtypes.NetworkEndpointSet{ negtypes.TestZone1: negtypes.NewNetworkEndpointSet( @@ -640,7 +651,7 @@ func TestRetrieveExistingZoneNetworkEndpointMap(t *testing.T) { for _, tc := range testCases { tc.mutate(negCloud) - out, err := retrieveExistingZoneNetworkEndpointMap(negName, zoneGetter, negCloud) + out, err := retrieveExistingZoneNetworkEndpointMap(negName, zoneGetter, negCloud, meta.VersionGA) if tc.expectErr { if err == nil { @@ -807,15 +818,15 @@ func TestShouldPodBeInNeg(t *testing.T) { } -func genTestEndpoints(num int) (negtypes.NetworkEndpointSet, map[negtypes.NetworkEndpoint]*compute.NetworkEndpoint) { +func genTestEndpoints(num int) (negtypes.NetworkEndpointSet, map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) { endpointSet := negtypes.NewNetworkEndpointSet() - endpointMap := map[negtypes.NetworkEndpoint]*compute.NetworkEndpoint{} + endpointMap := map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint{} ip := "1.2.3.4" instance := "instance" for port := 0; port < num; port++ { key := negtypes.NetworkEndpoint{IP: ip, Node: instance, Port: strconv.Itoa(port)} endpointSet.Insert(key) - endpointMap[key] = &compute.NetworkEndpoint{ + endpointMap[key] = &composite.NetworkEndpoint{ IpAddress: ip, Instance: instance, Port: int64(port), @@ -829,19 +840,19 @@ func networkEndpointFromEncodedEndpoint(encodedEndpoint string) negtypes.Network return negtypes.NetworkEndpoint{IP: ip, Node: node, Port: port} } -func getDefaultEndpoint() *apiv1.Endpoints { +func getDefaultEndpoint() *v1.Endpoints { instance1 := negtypes.TestInstance1 instance2 := negtypes.TestInstance2 instance3 := negtypes.TestInstance3 instance4 := negtypes.TestInstance4 - return &apiv1.Endpoints{ + return &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: testServiceName, Namespace: testServiceNamespace, }, - Subsets: []apiv1.EndpointSubset{ + Subsets: []v1.EndpointSubset{ { - Addresses: []apiv1.EndpointAddress{ + Addresses: []v1.EndpointAddress{ { IP: "10.100.1.1", NodeName: &instance1, @@ -875,7 +886,7 @@ func getDefaultEndpoint() *apiv1.Endpoints { }, }, }, - NotReadyAddresses: []apiv1.EndpointAddress{ + NotReadyAddresses: []v1.EndpointAddress{ { IP: "10.100.1.3", NodeName: &instance1, @@ -893,16 +904,16 @@ func getDefaultEndpoint() *apiv1.Endpoints { }, }, }, - Ports: []apiv1.EndpointPort{ + Ports: []v1.EndpointPort{ { Name: "", Port: int32(80), - Protocol: apiv1.ProtocolTCP, + Protocol: v1.ProtocolTCP, }, }, }, { - Addresses: []apiv1.EndpointAddress{ + Addresses: []v1.EndpointAddress{ { IP: "10.100.2.2", NodeName: &instance2, @@ -920,7 +931,7 @@ func getDefaultEndpoint() *apiv1.Endpoints { }, }, }, - NotReadyAddresses: []apiv1.EndpointAddress{ + NotReadyAddresses: []v1.EndpointAddress{ { IP: "10.100.4.3", NodeName: &instance4, @@ -930,16 +941,16 @@ func getDefaultEndpoint() *apiv1.Endpoints { }, }, }, - Ports: []apiv1.EndpointPort{ + Ports: []v1.EndpointPort{ { Name: testNamedPort, Port: int32(81), - Protocol: apiv1.ProtocolTCP, + Protocol: v1.ProtocolTCP, }, }, }, { - Addresses: []apiv1.EndpointAddress{ + Addresses: []v1.EndpointAddress{ { IP: "10.100.3.2", NodeName: &instance3, @@ -957,7 +968,7 @@ func getDefaultEndpoint() *apiv1.Endpoints { }, }, }, - NotReadyAddresses: []apiv1.EndpointAddress{ + NotReadyAddresses: []v1.EndpointAddress{ { IP: "10.100.4.4", NodeName: &instance4, @@ -967,11 +978,11 @@ func getDefaultEndpoint() *apiv1.Endpoints { }, }, }, - Ports: []apiv1.EndpointPort{ + Ports: []v1.EndpointPort{ { Name: testNamedPort, Port: int32(8081), - Protocol: apiv1.ProtocolTCP, + Protocol: v1.ProtocolTCP, }, }, }, diff --git a/pkg/neg/types/cloudprovideradapter.go b/pkg/neg/types/cloudprovideradapter.go index 4475103a29..d80fd5f108 100644 --- a/pkg/neg/types/cloudprovideradapter.go +++ b/pkg/neg/types/cloudprovideradapter.go @@ -17,12 +17,8 @@ limitations under the License. package types import ( - "strings" - - "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" - "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/filter" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" - "google.golang.org/api/compute/v1" + "k8s.io/ingress-gce/pkg/composite" "k8s.io/klog" "k8s.io/legacy-cloud-providers/gce" ) @@ -37,7 +33,7 @@ const ( // NewAdapter takes a Cloud and returns a NetworkEndpointGroupCloud. func NewAdapter(g *gce.Cloud) NetworkEndpointGroupCloud { return &cloudProviderAdapter{ - c: g.Compute(), + c: g, networkURL: g.NetworkURL(), subnetworkURL: g.SubnetworkURL(), } @@ -46,104 +42,79 @@ func NewAdapter(g *gce.Cloud) NetworkEndpointGroupCloud { // cloudProviderAdapter is a temporary shim to consolidate accesses to // Cloud and push them outside of this package. type cloudProviderAdapter struct { - c cloud.Cloud + c *gce.Cloud networkURL string subnetworkURL string } // GetNetworkEndpointGroup inmplements NetworkEndpointGroupCloud. -func (a *cloudProviderAdapter) GetNetworkEndpointGroup(name string, zone string) (*compute.NetworkEndpointGroup, error) { - ctx, cancel := cloud.ContextWithCallTimeout() - defer cancel() +func (a *cloudProviderAdapter) GetNetworkEndpointGroup(name string, zone string, version meta.Version) (*composite.NetworkEndpointGroup, error) { + return composite.GetNetworkEndpointGroup(a.c, meta.ZonalKey(name, zone), version) - return a.c.NetworkEndpointGroups().Get(ctx, meta.ZonalKey(name, zone)) } // ListNetworkEndpointGroup implements NetworkEndpointGroupCloud. -func (a *cloudProviderAdapter) ListNetworkEndpointGroup(zone string) ([]*compute.NetworkEndpointGroup, error) { - ctx, cancel := cloud.ContextWithCallTimeout() - defer cancel() - - return a.c.NetworkEndpointGroups().List(ctx, zone, filter.None) +func (a *cloudProviderAdapter) ListNetworkEndpointGroup(zone string, version meta.Version) ([]*composite.NetworkEndpointGroup, error) { + return composite.ListNetworkEndpointGroups(a.c, meta.ZonalKey("", zone), version) } // AggregatedListNetworkEndpointGroup returns a map of zone -> endpoint group. -func (a *cloudProviderAdapter) AggregatedListNetworkEndpointGroup() (map[string][]*compute.NetworkEndpointGroup, error) { - ctx, cancel := cloud.ContextWithCallTimeout() - defer cancel() - +func (a *cloudProviderAdapter) AggregatedListNetworkEndpointGroup(version meta.Version) (map[string][]*composite.NetworkEndpointGroup, error) { // TODO: filter for the region the cluster is in. - all, err := a.c.NetworkEndpointGroups().AggregatedList(ctx, filter.None) + all, err := composite.AggregatedListNetworkEndpointGroup(a.c, version) if err != nil { return nil, err } - ret := map[string][]*compute.NetworkEndpointGroup{} - for key, byZone := range all { + ret := map[string][]*composite.NetworkEndpointGroup{} + for key, obj := range all { // key is scope // zonal key is "zones/" // regional key is "regions/" // global key is "global" // TODO: use cloud provider meta.KeyType and scope name as key - parts := strings.Split(key, "/") - if len(parts) == 1 && parts[0] == aggregatedListGlobalKey { - klog.V(4).Infof("Ignoring key %q as it is global", key) + if key.Type() == meta.Global { + klog.V(4).Infof("Ignoring key %v as it is global", key) continue } - if len(parts) != 2 || parts[0] != aggregatedListZonalKeyPrefix { - klog.Warningf("Key %q is not in a known format, ignoring", key) + if key.Zone == "" { + klog.Warningf("Key %v does not have zone populated, ignoring", key) continue } - zone := parts[1] - ret[zone] = append(ret[zone], byZone...) + ret[key.Zone] = append(ret[key.Zone], obj) } return ret, nil } // CreateNetworkEndpointGroup implements NetworkEndpointGroupCloud. -func (a *cloudProviderAdapter) CreateNetworkEndpointGroup(neg *compute.NetworkEndpointGroup, zone string) error { - ctx, cancel := cloud.ContextWithCallTimeout() - defer cancel() - - return a.c.NetworkEndpointGroups().Insert(ctx, meta.ZonalKey(neg.Name, zone), neg) +func (a *cloudProviderAdapter) CreateNetworkEndpointGroup(neg *composite.NetworkEndpointGroup, zone string) error { + return composite.CreateNetworkEndpointGroup(a.c, meta.ZonalKey(neg.Name, zone), neg) } // DeleteNetworkEndpointGroup implements NetworkEndpointGroupCloud. -func (a *cloudProviderAdapter) DeleteNetworkEndpointGroup(name string, zone string) error { - ctx, cancel := cloud.ContextWithCallTimeout() - defer cancel() - - return a.c.NetworkEndpointGroups().Delete(ctx, meta.ZonalKey(name, zone)) +func (a *cloudProviderAdapter) DeleteNetworkEndpointGroup(name string, zone string, version meta.Version) error { + return composite.DeleteNetworkEndpointGroup(a.c, meta.ZonalKey(name, zone), version) } // AttachNetworkEndpoints implements NetworkEndpointGroupCloud. -func (a cloudProviderAdapter) AttachNetworkEndpoints(name, zone string, endpoints []*compute.NetworkEndpoint) error { - ctx, cancel := cloud.ContextWithCallTimeout() - defer cancel() - - req := &compute.NetworkEndpointGroupsAttachEndpointsRequest{NetworkEndpoints: endpoints} - return a.c.NetworkEndpointGroups().AttachNetworkEndpoints(ctx, meta.ZonalKey(name, zone), req) +func (a cloudProviderAdapter) AttachNetworkEndpoints(name, zone string, endpoints []*composite.NetworkEndpoint, version meta.Version) error { + req := &composite.NetworkEndpointGroupsAttachEndpointsRequest{NetworkEndpoints: endpoints} + return composite.AttachNetworkEndpoints(a.c, meta.ZonalKey(name, zone), version, req) } // DetachNetworkEndpoints implements NetworkEndpointGroupCloud. -func (a *cloudProviderAdapter) DetachNetworkEndpoints(name, zone string, endpoints []*compute.NetworkEndpoint) error { - ctx, cancel := cloud.ContextWithCallTimeout() - defer cancel() - - req := &compute.NetworkEndpointGroupsDetachEndpointsRequest{NetworkEndpoints: endpoints} - return a.c.NetworkEndpointGroups().DetachNetworkEndpoints(ctx, meta.ZonalKey(name, zone), req) +func (a *cloudProviderAdapter) DetachNetworkEndpoints(name, zone string, endpoints []*composite.NetworkEndpoint, version meta.Version) error { + req := &composite.NetworkEndpointGroupsDetachEndpointsRequest{NetworkEndpoints: endpoints} + return composite.DetachNetworkEndpoints(a.c, meta.ZonalKey(name, zone), version, req) } // ListNetworkEndpoints implements NetworkEndpointGroupCloud. -func (a *cloudProviderAdapter) ListNetworkEndpoints(name, zone string, showHealthStatus bool) ([]*compute.NetworkEndpointWithHealthStatus, error) { - ctx, cancel := cloud.ContextWithCallTimeout() - defer cancel() - +func (a *cloudProviderAdapter) ListNetworkEndpoints(name, zone string, showHealthStatus bool, version meta.Version) ([]*composite.NetworkEndpointWithHealthStatus, error) { healthStatus := "SKIP" if showHealthStatus { healthStatus = "SHOW" } - req := &compute.NetworkEndpointGroupsListEndpointsRequest{HealthStatus: healthStatus} - return a.c.NetworkEndpointGroups().ListNetworkEndpoints(ctx, meta.ZonalKey(name, zone), req, filter.None) + req := &composite.NetworkEndpointGroupsListEndpointsRequest{HealthStatus: healthStatus} + return composite.ListNetworkEndpoints(a.c, meta.ZonalKey(name, zone), version, req) } // NetworkURL implements NetworkEndpointGroupCloud. diff --git a/pkg/neg/types/cloudprovideradapter_test.go b/pkg/neg/types/cloudprovideradapter_test.go index f0a1da8752..14a2eb366a 100644 --- a/pkg/neg/types/cloudprovideradapter_test.go +++ b/pkg/neg/types/cloudprovideradapter_test.go @@ -16,12 +16,13 @@ limitations under the License. package types import ( - "k8s.io/legacy-cloud-providers/gce" "testing" - "google.golang.org/api/compute/v1" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" + "k8s.io/legacy-cloud-providers/gce" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/ingress-gce/pkg/composite" ) func TestAggregatedListNetworkEndpointGroup(t *testing.T) { @@ -40,7 +41,7 @@ func TestAggregatedListNetworkEndpointGroup(t *testing.T) { validateAggregatedList(t, fakeCloud, 0, map[string][]string{}) - neg := &compute.NetworkEndpointGroup{Name: neg1} + neg := &composite.NetworkEndpointGroup{Name: neg1, Version: meta.VersionGA} zone := zone1 if err := fakeCloud.CreateNetworkEndpointGroup(neg, zone); err != nil { t.Fatalf("Got CreateNetworkEndpointGroup(%v, %v) = %v, want nil", neg, zone, err) @@ -48,7 +49,7 @@ func TestAggregatedListNetworkEndpointGroup(t *testing.T) { validateAggregatedList(t, fakeCloud, 1, map[string][]string{zone1: {neg1}}) - neg = &compute.NetworkEndpointGroup{Name: neg2} + neg = &composite.NetworkEndpointGroup{Name: neg2, Version: meta.VersionGA} zone = zone2 if err := fakeCloud.CreateNetworkEndpointGroup(neg, zone); err != nil { t.Fatalf("Got CreateNetworkEndpointGroup(%v, %v) = %v, want nil", neg, zone, err) @@ -56,7 +57,7 @@ func TestAggregatedListNetworkEndpointGroup(t *testing.T) { validateAggregatedList(t, fakeCloud, 2, map[string][]string{zone1: {neg1}, zone2: {neg2}}) - neg = &compute.NetworkEndpointGroup{Name: neg1} + neg = &composite.NetworkEndpointGroup{Name: neg1, Version: meta.VersionGA} zone = zone2 if err := fakeCloud.CreateNetworkEndpointGroup(neg, zone); err != nil { t.Fatalf("Got CreateNetworkEndpointGroup(%v, %v) = %v, want nil", neg, zone, err) @@ -64,7 +65,7 @@ func TestAggregatedListNetworkEndpointGroup(t *testing.T) { validateAggregatedList(t, fakeCloud, 2, map[string][]string{zone1: {neg1}, zone2: {neg1, neg2}}) - if err := fakeCloud.DeleteNetworkEndpointGroup(neg1, zone1); err != nil { + if err := fakeCloud.DeleteNetworkEndpointGroup(neg1, zone1, meta.VersionGA); err != nil { t.Fatalf("Got DeleteNetworkEndpointGroup(%v, %v) = %v, want nil", neg1, zone1, err) } @@ -72,7 +73,7 @@ func TestAggregatedListNetworkEndpointGroup(t *testing.T) { } func validateAggregatedList(t *testing.T, adapter NetworkEndpointGroupCloud, expectZoneNum int, expectZoneNegs map[string][]string) { - ret, err := adapter.AggregatedListNetworkEndpointGroup() + ret, err := adapter.AggregatedListNetworkEndpointGroup(meta.VersionGA) if err != nil { t.Errorf("Expect AggregatedListNetworkEndpointGroup to return nil error, but got %v", err) } diff --git a/pkg/neg/types/fakes.go b/pkg/neg/types/fakes.go index f93f630de4..169d30c077 100644 --- a/pkg/neg/types/fakes.go +++ b/pkg/neg/types/fakes.go @@ -23,8 +23,8 @@ import ( "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" - compute "google.golang.org/api/compute/v1" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/ingress-gce/pkg/composite" ) const ( @@ -66,8 +66,8 @@ func (f *fakeZoneGetter) GetZoneForNode(name string) (string, error) { } type FakeNetworkEndpointGroupCloud struct { - NetworkEndpointGroups map[string][]*compute.NetworkEndpointGroup - NetworkEndpoints map[string][]*compute.NetworkEndpoint + NetworkEndpointGroups map[string][]*composite.NetworkEndpointGroup + NetworkEndpoints map[string][]*composite.NetworkEndpoint Subnetwork string Network string mu sync.Mutex @@ -77,14 +77,14 @@ func NewFakeNetworkEndpointGroupCloud(subnetwork, network string) NetworkEndpoin return &FakeNetworkEndpointGroupCloud{ Subnetwork: subnetwork, Network: network, - NetworkEndpointGroups: map[string][]*compute.NetworkEndpointGroup{}, - NetworkEndpoints: map[string][]*compute.NetworkEndpoint{}, + NetworkEndpointGroups: map[string][]*composite.NetworkEndpointGroup{}, + NetworkEndpoints: map[string][]*composite.NetworkEndpoint{}, } } var NotFoundError = fmt.Errorf("not Found") -func (f *FakeNetworkEndpointGroupCloud) GetNetworkEndpointGroup(name string, zone string) (*compute.NetworkEndpointGroup, error) { +func (f *FakeNetworkEndpointGroupCloud) GetNetworkEndpointGroup(name string, zone string, version meta.Version) (*composite.NetworkEndpointGroup, error) { f.mu.Lock() defer f.mu.Unlock() negs, ok := f.NetworkEndpointGroups[zone] @@ -102,36 +102,36 @@ func networkEndpointKey(name, zone string) string { return fmt.Sprintf("%s-%s", zone, name) } -func (f *FakeNetworkEndpointGroupCloud) ListNetworkEndpointGroup(zone string) ([]*compute.NetworkEndpointGroup, error) { +func (f *FakeNetworkEndpointGroupCloud) ListNetworkEndpointGroup(zone string, version meta.Version) ([]*composite.NetworkEndpointGroup, error) { f.mu.Lock() defer f.mu.Unlock() return f.NetworkEndpointGroups[zone], nil } -func (f *FakeNetworkEndpointGroupCloud) AggregatedListNetworkEndpointGroup() (map[string][]*compute.NetworkEndpointGroup, error) { +func (f *FakeNetworkEndpointGroupCloud) AggregatedListNetworkEndpointGroup(version meta.Version) (map[string][]*composite.NetworkEndpointGroup, error) { f.mu.Lock() defer f.mu.Unlock() return f.NetworkEndpointGroups, nil } -func (f *FakeNetworkEndpointGroupCloud) CreateNetworkEndpointGroup(neg *compute.NetworkEndpointGroup, zone string) error { +func (f *FakeNetworkEndpointGroupCloud) CreateNetworkEndpointGroup(neg *composite.NetworkEndpointGroup, zone string) error { f.mu.Lock() defer f.mu.Unlock() neg.SelfLink = cloud.NewNetworkEndpointGroupsResourceID("mock-project", zone, neg.Name).SelfLink(meta.VersionAlpha) if _, ok := f.NetworkEndpointGroups[zone]; !ok { - f.NetworkEndpointGroups[zone] = []*compute.NetworkEndpointGroup{} + f.NetworkEndpointGroups[zone] = []*composite.NetworkEndpointGroup{} } f.NetworkEndpointGroups[zone] = append(f.NetworkEndpointGroups[zone], neg) - f.NetworkEndpoints[networkEndpointKey(neg.Name, zone)] = []*compute.NetworkEndpoint{} + f.NetworkEndpoints[networkEndpointKey(neg.Name, zone)] = []*composite.NetworkEndpoint{} return nil } -func (f *FakeNetworkEndpointGroupCloud) DeleteNetworkEndpointGroup(name string, zone string) error { +func (f *FakeNetworkEndpointGroupCloud) DeleteNetworkEndpointGroup(name string, zone string, version meta.Version) error { f.mu.Lock() defer f.mu.Unlock() delete(f.NetworkEndpoints, networkEndpointKey(name, zone)) negs := f.NetworkEndpointGroups[zone] - newList := []*compute.NetworkEndpointGroup{} + newList := []*composite.NetworkEndpointGroup{} found := false for _, neg := range negs { if neg.Name == name { @@ -147,17 +147,17 @@ func (f *FakeNetworkEndpointGroupCloud) DeleteNetworkEndpointGroup(name string, return nil } -func (f *FakeNetworkEndpointGroupCloud) AttachNetworkEndpoints(name, zone string, endpoints []*compute.NetworkEndpoint) error { +func (f *FakeNetworkEndpointGroupCloud) AttachNetworkEndpoints(name, zone string, endpoints []*composite.NetworkEndpoint, version meta.Version) error { f.mu.Lock() defer f.mu.Unlock() f.NetworkEndpoints[networkEndpointKey(name, zone)] = append(f.NetworkEndpoints[networkEndpointKey(name, zone)], endpoints...) return nil } -func (f *FakeNetworkEndpointGroupCloud) DetachNetworkEndpoints(name, zone string, endpoints []*compute.NetworkEndpoint) error { +func (f *FakeNetworkEndpointGroupCloud) DetachNetworkEndpoints(name, zone string, endpoints []*composite.NetworkEndpoint, version meta.Version) error { f.mu.Lock() defer f.mu.Unlock() - newList := []*compute.NetworkEndpoint{} + newList := []*composite.NetworkEndpoint{} for _, ne := range f.NetworkEndpoints[networkEndpointKey(name, zone)] { found := false for _, remove := range endpoints { @@ -175,16 +175,16 @@ func (f *FakeNetworkEndpointGroupCloud) DetachNetworkEndpoints(name, zone string return nil } -func (f *FakeNetworkEndpointGroupCloud) ListNetworkEndpoints(name, zone string, showHealthStatus bool) ([]*compute.NetworkEndpointWithHealthStatus, error) { +func (f *FakeNetworkEndpointGroupCloud) ListNetworkEndpoints(name, zone string, showHealthStatus bool, version meta.Version) ([]*composite.NetworkEndpointWithHealthStatus, error) { f.mu.Lock() defer f.mu.Unlock() - ret := []*compute.NetworkEndpointWithHealthStatus{} + ret := []*composite.NetworkEndpointWithHealthStatus{} nes, ok := f.NetworkEndpoints[networkEndpointKey(name, zone)] if !ok { return nil, NotFoundError } for _, ne := range nes { - ret = append(ret, &compute.NetworkEndpointWithHealthStatus{NetworkEndpoint: ne}) + ret = append(ret, &composite.NetworkEndpointWithHealthStatus{NetworkEndpoint: ne}) } return ret, nil } diff --git a/pkg/neg/types/interfaces.go b/pkg/neg/types/interfaces.go index 8e4d4e6a05..38f36a17f4 100644 --- a/pkg/neg/types/interfaces.go +++ b/pkg/neg/types/interfaces.go @@ -17,7 +17,8 @@ limitations under the License. package types import ( - compute "google.golang.org/api/compute/v1" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" + "k8s.io/ingress-gce/pkg/composite" ) // ZoneGetter is an interface for retrieve zone related information @@ -28,14 +29,14 @@ type ZoneGetter interface { // NetworkEndpointGroupCloud is an interface for managing gce network endpoint group. type NetworkEndpointGroupCloud interface { - GetNetworkEndpointGroup(name string, zone string) (*compute.NetworkEndpointGroup, error) - ListNetworkEndpointGroup(zone string) ([]*compute.NetworkEndpointGroup, error) - AggregatedListNetworkEndpointGroup() (map[string][]*compute.NetworkEndpointGroup, error) - CreateNetworkEndpointGroup(neg *compute.NetworkEndpointGroup, zone string) error - DeleteNetworkEndpointGroup(name string, zone string) error - AttachNetworkEndpoints(name, zone string, endpoints []*compute.NetworkEndpoint) error - DetachNetworkEndpoints(name, zone string, endpoints []*compute.NetworkEndpoint) error - ListNetworkEndpoints(name, zone string, showHealthStatus bool) ([]*compute.NetworkEndpointWithHealthStatus, error) + GetNetworkEndpointGroup(name string, zone string, version meta.Version) (*composite.NetworkEndpointGroup, error) + ListNetworkEndpointGroup(zone string, version meta.Version) ([]*composite.NetworkEndpointGroup, error) + AggregatedListNetworkEndpointGroup(version meta.Version) (map[string][]*composite.NetworkEndpointGroup, error) + CreateNetworkEndpointGroup(neg *composite.NetworkEndpointGroup, zone string) error + DeleteNetworkEndpointGroup(name string, zone string, version meta.Version) error + AttachNetworkEndpoints(name, zone string, endpoints []*composite.NetworkEndpoint, version meta.Version) error + DetachNetworkEndpoints(name, zone string, endpoints []*composite.NetworkEndpoint, version meta.Version) error + ListNetworkEndpoints(name, zone string, showHealthStatus bool, version meta.Version) ([]*composite.NetworkEndpointWithHealthStatus, error) NetworkURL() string SubnetworkURL() string } diff --git a/pkg/neg/types/mock.go b/pkg/neg/types/mock.go index 0085ecbb86..dbf3bc4aba 100644 --- a/pkg/neg/types/mock.go +++ b/pkg/neg/types/mock.go @@ -20,13 +20,14 @@ import ( "context" "fmt" + "net/http" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/filter" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" "google.golang.org/api/compute/v1" "google.golang.org/api/googleapi" "k8s.io/legacy-cloud-providers/gce" - "net/http" ) type NetworkEndpointEntry struct { @@ -48,7 +49,7 @@ func (s NetworkEndpointStore) AddNetworkEndpointHealthStatus(key meta.Key, entry // GetNetworkEndpointStore is a helper function to access the NetworkEndpointStore of the mock NEG cloud func GetNetworkEndpointStore(negCloud NetworkEndpointGroupCloud) NetworkEndpointStore { adapter := negCloud.(*cloudProviderAdapter) - mockedCloud := adapter.c.(*cloud.MockGCE) + mockedCloud := adapter.c.Compute().(*cloud.MockGCE) ret := mockedCloud.MockNetworkEndpointGroups.X.(NetworkEndpointStore) return ret } diff --git a/pkg/neg/types/types.go b/pkg/neg/types/types.go index 8fb5242e37..5ccb58b47d 100644 --- a/pkg/neg/types/types.go +++ b/pkg/neg/types/types.go @@ -22,6 +22,8 @@ import ( "strconv" "strings" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" + istioV1alpha3 "istio.io/api/networking/v1alpha3" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" @@ -33,6 +35,7 @@ type NetworkEndpointType string const ( VmIpPortEndpointType = NetworkEndpointType("GCE_VM_IP_PORT") + VmPrimaryIpEndpointType = NetworkEndpointType("GCE_VM_PRIMARY_IP") NonGCPPrivateEndpointType = NetworkEndpointType("NON_GCP_PRIVATE_IP_PORT") ) @@ -252,5 +255,16 @@ func (key NegSyncerKey) String() string { return fmt.Sprintf("%s/%s-%s-%s", key.Namespace, key.Name, key.Subset, key.PortTuple.String()) } +// GetAPIVersion returns the compute API version to be used in order +// to create the negType specified in the given NegSyncerKey. +func (key NegSyncerKey) GetAPIVersion() meta.Version { + switch key.NegType { + case VmPrimaryIpEndpointType: + return meta.VersionAlpha + default: + return meta.VersionGA + } +} + // EndpointPodMap is a map from network endpoint to a namespaced name of a pod type EndpointPodMap map[NetworkEndpoint]types.NamespacedName diff --git a/pkg/utils/serviceport.go b/pkg/utils/serviceport.go index 3810dc4321..710e08f1b0 100644 --- a/pkg/utils/serviceport.go +++ b/pkg/utils/serviceport.go @@ -19,6 +19,8 @@ package utils import ( "fmt" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" + "k8s.io/api/networking/v1beta1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" @@ -53,6 +55,16 @@ type ServicePort struct { BackendNamer namer.BackendNamer } +// GetAPIVersionFromServicePort returns the compute API version to be used +// for creating NEGs associated with the given ServicePort. +func GetAPIVersionFromServicePort(sp *ServicePort) meta.Version { + if sp == nil { + // this uses VM_PRIMARY_IP_NEGS which requires alpha API + return meta.VersionAlpha + } + return meta.VersionGA +} + // GetDescription returns a Description for this ServicePort. func (sp ServicePort) GetDescription() Description { return Description{ From e01a6265453cf50048a9e4b37edcd59b674b3607 Mon Sep 17 00:00:00 2001 From: Pavithra Ramesh Date: Mon, 4 Nov 2019 13:10:05 -0800 Subject: [PATCH 2/3] Composite change to AggregatedList There is some code duplication since map[string]Struct cannot be cast to map[string]interface{} Fixed a bug in ListNetworkEndpointGroups, where the incorrect err variable was being returned. --- pkg/composite/gen/main.go | 79 +++++++++++++++++++++++++-------------- 1 file changed, 51 insertions(+), 28 deletions(-) diff --git a/pkg/composite/gen/main.go b/pkg/composite/gen/main.go index 06db3ded7f..a48022061c 100644 --- a/pkg/composite/gen/main.go +++ b/pkg/composite/gen/main.go @@ -681,23 +681,23 @@ func {{.GetGroupResourceInfo.ListFuncName}}(gceCloud *gce.Cloud, key *meta.Key, switch version { case meta.VersionAlpha: - alphareq, err := req.ToAlpha() - if err != nil { - return nil, err + alphareq, reqerr := req.ToAlpha() + if reqerr != nil { + return nil, reqerr } klog.V(3).Infof("Listing alpha zonal {{.Name}} %v", key.Name) gceObjs, err = gceCloud.Compute().Alpha{{.GetCloudProviderName}}().{{.GetGroupResourceInfo.ListFuncName}}(ctx, key, alphareq, filter.None) case meta.VersionBeta: - betareq, err := req.ToBeta() - if err != nil { - return nil, err + betareq, reqerr := req.ToBeta() + if reqerr != nil { + return nil, reqerr } klog.V(3).Infof("Listing beta zonal {{.Name}} %v", key.Name) gceObjs, err = gceCloud.Compute().Beta{{.GetCloudProviderName}}().{{.GetGroupResourceInfo.ListFuncName}}(ctx, key, betareq, filter.None) default: - gareq, err := req.ToGA() - if err != nil { - return nil, err + gareq, reqerr := req.ToGA() + if reqerr != nil { + return nil, reqerr } klog.V(3).Infof("Listing ga zonal {{.Name}} %v", key.Name) gceObjs, err = gceCloud.Compute().{{.GetCloudProviderName}}().{{.GetGroupResourceInfo.ListFuncName}}(ctx, key, gareq, filter.None) @@ -716,43 +716,66 @@ func {{.GetGroupResourceInfo.ListFuncName}}(gceCloud *gce.Cloud, key *meta.Key, return compositeObjs, nil } -func {{.GetGroupResourceInfo.AggListFuncName}}{{.GetGroupResourceInfo.AggListRespName}}(gceCloud *gce.Cloud, version meta.Version) (map[string][]*{{.GetGroupResourceInfo.AggListRespName}}, error) { +func {{.GetGroupResourceInfo.AggListFuncName}}{{.GetGroupResourceInfo.AggListRespName}}(gceCloud *gce.Cloud, version meta.Version) (map[*meta.Key]*{{.GetGroupResourceInfo.AggListRespName}}, error) { ctx, cancel := cloudprovider.ContextWithCallTimeout() defer cancel() mc := compositemetrics.NewMetricContext("{{.Name}}", "aggregateList", "", "", string(version)) + compositeMap := make(map[*meta.Key]*{{.GetGroupResourceInfo.AggListRespName}}) var gceObjs interface{} - compositeMap := make(map[string][]*{{.GetGroupResourceInfo.AggListRespName}}) - var err error switch version { case meta.VersionAlpha: klog.V(3).Infof("Aggregate List of alpha zonal {{.Name}}") - gceObjs, err = gceCloud.Compute().Alpha{{.GetCloudProviderName}}().{{.GetGroupResourceInfo.AggListFuncName}}(ctx, filter.None) + alphaMap, err := gceCloud.Compute().Alpha{{.GetCloudProviderName}}().{{.GetGroupResourceInfo.AggListFuncName}}(ctx, filter.None) + if err != nil { + return nil, mc.Observe(err) + } + // Convert from map to list + alphaList := []*computealpha.{{.GetGroupResourceInfo.AggListRespName}}{} + for _, val := range alphaMap { + alphaList = append(alphaList, val...) + } + gceObjs = alphaList case meta.VersionBeta: klog.V(3).Infof("Aggregate List of beta zonal {{.Name}}") - gceObjs, err = gceCloud.Compute().Beta{{.GetCloudProviderName}}().{{.GetGroupResourceInfo.AggListFuncName}}(ctx, filter.None) + betaMap, err := gceCloud.Compute().Beta{{.GetCloudProviderName}}().{{.GetGroupResourceInfo.AggListFuncName}}(ctx, filter.None) + if err != nil { + return nil, mc.Observe(err) + } + // Convert from map to list + betaList := []*computebeta.{{.GetGroupResourceInfo.AggListRespName}}{} + for _, val := range betaMap { + betaList = append(betaList, val...) + } + gceObjs = betaList default: klog.V(3).Infof("Aggregate List of ga zonal {{.Name}}") - gceObjs, err = gceCloud.Compute().{{.GetCloudProviderName}}().{{.GetGroupResourceInfo.AggListFuncName}}(ctx, filter.None) + gaMap, err := gceCloud.Compute().{{.GetCloudProviderName}}().{{.GetGroupResourceInfo.AggListFuncName}}(ctx, filter.None) + if err != nil { + return nil, mc.Observe(err) + } + // Convert from map to list + gaList := []*compute.{{.GetGroupResourceInfo.AggListRespName}}{} + for _, val := range gaMap { + gaList = append(gaList, val...) + } + gceObjs = gaList } + compositeObjs, err := To{{.GetGroupResourceInfo.AggListRespName}}List(gceObjs) if err != nil { - return nil, mc.Observe(err) - } - gceMap, ok := gceObjs.(map[string]interface{}) - if !ok { - return nil, fmt.Errorf("Failed to convert gceObj to map[string]interface{}") + return nil, err } - for keyStr, obj := range gceMap { - compositeObjs, err := To{{.GetGroupResourceInfo.AggListRespName}}List(obj) - if err != nil { - return nil, err - } - for _, o := range compositeObjs { - o.Version = version + for _, obj := range compositeObjs { + obj.Version = version + resourceID, err := cloudprovider.ParseResourceURL(obj.SelfLink) + if err != nil || resourceID == nil || resourceID.Key == nil { + klog.Errorf("Failed to parse SelfLink - %s for obj %v, err %v", obj.SelfLink, obj, err) + continue } - compositeMap[keyStr] = compositeObjs + compositeMap[resourceID.Key] = obj } + return compositeMap, nil } {{end}} {{/*IsGroupResourceService*/}} From 3849ae4046768a536aa17baadd86f172279a9021 Mon Sep 17 00:00:00 2001 From: Pavithra Ramesh Date: Mon, 4 Nov 2019 13:12:43 -0800 Subject: [PATCH 3/3] Autogenerated changes. --- pkg/composite/gen.go | 79 ++++++++++++++++++++++++++++---------------- 1 file changed, 51 insertions(+), 28 deletions(-) diff --git a/pkg/composite/gen.go b/pkg/composite/gen.go index cf14701a88..30e1f170d3 100644 --- a/pkg/composite/gen.go +++ b/pkg/composite/gen.go @@ -3726,23 +3726,23 @@ func ListNetworkEndpoints(gceCloud *gce.Cloud, key *meta.Key, version meta.Versi switch version { case meta.VersionAlpha: - alphareq, err := req.ToAlpha() - if err != nil { - return nil, err + alphareq, reqerr := req.ToAlpha() + if reqerr != nil { + return nil, reqerr } klog.V(3).Infof("Listing alpha zonal NetworkEndpointGroup %v", key.Name) gceObjs, err = gceCloud.Compute().AlphaNetworkEndpointGroups().ListNetworkEndpoints(ctx, key, alphareq, filter.None) case meta.VersionBeta: - betareq, err := req.ToBeta() - if err != nil { - return nil, err + betareq, reqerr := req.ToBeta() + if reqerr != nil { + return nil, reqerr } klog.V(3).Infof("Listing beta zonal NetworkEndpointGroup %v", key.Name) gceObjs, err = gceCloud.Compute().BetaNetworkEndpointGroups().ListNetworkEndpoints(ctx, key, betareq, filter.None) default: - gareq, err := req.ToGA() - if err != nil { - return nil, err + gareq, reqerr := req.ToGA() + if reqerr != nil { + return nil, reqerr } klog.V(3).Infof("Listing ga zonal NetworkEndpointGroup %v", key.Name) gceObjs, err = gceCloud.Compute().NetworkEndpointGroups().ListNetworkEndpoints(ctx, key, gareq, filter.None) @@ -3761,43 +3761,66 @@ func ListNetworkEndpoints(gceCloud *gce.Cloud, key *meta.Key, version meta.Versi return compositeObjs, nil } -func AggregatedListNetworkEndpointGroup(gceCloud *gce.Cloud, version meta.Version) (map[string][]*NetworkEndpointGroup, error) { +func AggregatedListNetworkEndpointGroup(gceCloud *gce.Cloud, version meta.Version) (map[*meta.Key]*NetworkEndpointGroup, error) { ctx, cancel := cloudprovider.ContextWithCallTimeout() defer cancel() mc := compositemetrics.NewMetricContext("NetworkEndpointGroup", "aggregateList", "", "", string(version)) + compositeMap := make(map[*meta.Key]*NetworkEndpointGroup) var gceObjs interface{} - compositeMap := make(map[string][]*NetworkEndpointGroup) - var err error switch version { case meta.VersionAlpha: klog.V(3).Infof("Aggregate List of alpha zonal NetworkEndpointGroup") - gceObjs, err = gceCloud.Compute().AlphaNetworkEndpointGroups().AggregatedList(ctx, filter.None) + alphaMap, err := gceCloud.Compute().AlphaNetworkEndpointGroups().AggregatedList(ctx, filter.None) + if err != nil { + return nil, mc.Observe(err) + } + // Convert from map to list + alphaList := []*computealpha.NetworkEndpointGroup{} + for _, val := range alphaMap { + alphaList = append(alphaList, val...) + } + gceObjs = alphaList case meta.VersionBeta: klog.V(3).Infof("Aggregate List of beta zonal NetworkEndpointGroup") - gceObjs, err = gceCloud.Compute().BetaNetworkEndpointGroups().AggregatedList(ctx, filter.None) + betaMap, err := gceCloud.Compute().BetaNetworkEndpointGroups().AggregatedList(ctx, filter.None) + if err != nil { + return nil, mc.Observe(err) + } + // Convert from map to list + betaList := []*computebeta.NetworkEndpointGroup{} + for _, val := range betaMap { + betaList = append(betaList, val...) + } + gceObjs = betaList default: klog.V(3).Infof("Aggregate List of ga zonal NetworkEndpointGroup") - gceObjs, err = gceCloud.Compute().NetworkEndpointGroups().AggregatedList(ctx, filter.None) + gaMap, err := gceCloud.Compute().NetworkEndpointGroups().AggregatedList(ctx, filter.None) + if err != nil { + return nil, mc.Observe(err) + } + // Convert from map to list + gaList := []*compute.NetworkEndpointGroup{} + for _, val := range gaMap { + gaList = append(gaList, val...) + } + gceObjs = gaList } + compositeObjs, err := ToNetworkEndpointGroupList(gceObjs) if err != nil { - return nil, mc.Observe(err) - } - gceMap, ok := gceObjs.(map[string]interface{}) - if !ok { - return nil, fmt.Errorf("Failed to convert gceObj to map[string]interface{}") + return nil, err } - for keyStr, obj := range gceMap { - compositeObjs, err := ToNetworkEndpointGroupList(obj) - if err != nil { - return nil, err - } - for _, o := range compositeObjs { - o.Version = version + for _, obj := range compositeObjs { + obj.Version = version + resourceID, err := cloudprovider.ParseResourceURL(obj.SelfLink) + if err != nil || resourceID == nil || resourceID.Key == nil { + klog.Errorf("Failed to parse SelfLink - %s for obj %v, err %v", obj.SelfLink, obj, err) + continue } - compositeMap[keyStr] = compositeObjs + compositeMap[resourceID.Key] = obj } + return compositeMap, nil }