From 15326f45ada1075d7e14052f73de34e83f857aba Mon Sep 17 00:00:00 2001 From: Pavithra Ramesh Date: Mon, 30 Dec 2019 14:29:11 -0800 Subject: [PATCH] Added new controller for L4 ILB services Fixes including adding CustomSubnet support and self links for backend service, healthcheck. Removed the operation from queue key Added connectiondraining to backend service. Added a new flag to run L4 controller. Used namer.PrimaryIPNEG() instead of namer.NEG() Use a common description for all ILB resources. --- cmd/glbc/main.go | 6 + pkg/backends/backends.go | 94 ++++++++++ pkg/backends/features/features.go | 7 +- pkg/backends/syncer.go | 5 +- pkg/controller/service_controller.go | 259 ++++++++++++++++++++++++++ pkg/firewalls/firewalls_l4.go | 99 ++++++++++ pkg/healthchecks/healthchecks_l4.go | 156 ++++++++++++++++ pkg/loadbalancers/forwarding_rules.go | 170 +++++++++++++++++ pkg/loadbalancers/l4.go | 196 +++++++++++++++++++ pkg/utils/common/finalizer.go | 42 ++++- pkg/utils/namer/interfaces.go | 2 + pkg/utils/namer/namer.go | 10 +- pkg/utils/serviceport.go | 21 ++- pkg/utils/utils.go | 130 ++++++++++++- pkg/utils/utils_test.go | 4 +- 15 files changed, 1175 insertions(+), 26 deletions(-) create mode 100644 pkg/controller/service_controller.go create mode 100644 pkg/firewalls/firewalls_l4.go create mode 100644 pkg/healthchecks/healthchecks_l4.go create mode 100644 pkg/loadbalancers/l4.go diff --git a/cmd/glbc/main.go b/cmd/glbc/main.go index 88e8a91bc4..0e57f463fa 100644 --- a/cmd/glbc/main.go +++ b/cmd/glbc/main.go @@ -218,6 +218,12 @@ func runControllers(ctx *ingctx.ControllerContext) { fwc := firewalls.NewFirewallController(ctx, flags.F.NodePortRanges.Values()) + if flags.F.RunL4Controller { + l4Controller := controller.NewL4Controller(ctx, stopCh) + go l4Controller.Run() + klog.V(0).Infof("L4 controller started") + } + // TODO: Refactor NEG to use cloud mocks so ctx.Cloud can be referenced within NewController. negController := neg.NewController(negtypes.NewAdapter(ctx.Cloud), ctx, lbc.Translator, ctx.ClusterNamer, flags.F.ResyncPeriod, flags.F.NegGCPeriod, flags.F.EnableReadinessReflector, flags.F.RunIngressController, flags.F.RunL4Controller) diff --git a/pkg/backends/backends.go b/pkg/backends/backends.go index f8687cdca8..aaced1c350 100644 --- a/pkg/backends/backends.go +++ b/pkg/backends/backends.go @@ -19,6 +19,8 @@ import ( "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" "google.golang.org/api/compute/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/ingress-gce/pkg/backends/features" "k8s.io/ingress-gce/pkg/composite" "k8s.io/ingress-gce/pkg/utils" @@ -27,6 +29,10 @@ import ( "k8s.io/legacy-cloud-providers/gce" ) +const ( + DefaultConnectionDrainingTimeoutSeconds = 30 +) + // Backends handles CRUD operations for backends. type Backends struct { cloud *gce.Cloud @@ -224,3 +230,91 @@ func (b *Backends) List(key *meta.Key, version meta.Version) ([]*composite.Backe } return clusterBackends, nil } + +// EnsureL4BackendService creates or updates the backend service with the given name. +func (b *Backends) EnsureL4BackendService(name, hcLink, protocol, sessionAffinity, scheme string, nm types.NamespacedName, version meta.Version) (*composite.BackendService, error) { + klog.V(2).Infof("EnsureL4BackendService(%v, %v, %v): checking existing backend service", name, scheme, protocol) + key, err := composite.CreateKey(b.cloud, name, meta.Regional) + bs, err := composite.GetBackendService(b.cloud, key, meta.VersionGA) + if err != nil && !utils.IsNotFoundError(err) { + return nil, err + } + desc, err := utils.MakeL4ILBServiceDescription(nm.String(), "", meta.VersionGA) + if err != nil { + klog.Warningf("EnsureL4BackendService: Failed to generate description for BackendService %s, err %v", + name, err) + } + expectedBS := &composite.BackendService{ + Name: name, + Protocol: string(protocol), + Description: desc, + HealthChecks: []string{hcLink}, + SessionAffinity: utils.TranslateAffinityType(sessionAffinity), + LoadBalancingScheme: string(scheme), + ConnectionDraining: &composite.ConnectionDraining{DrainingTimeoutSec: DefaultConnectionDrainingTimeoutSeconds}, + } + + // Create backend service if none was found + if bs == nil { + klog.V(2).Infof("EnsureL4BackendService: creating backend service %v", name) + err := composite.CreateBackendService(b.cloud, key, expectedBS) + if err != nil { + return nil, err + } + klog.V(2).Infof("EnsureL4BackendService: created backend service %v successfully", name) + // We need to perform a GCE call to re-fetch the object we just created + // so that the "Fingerprint" field is filled in. This is needed to update the + // object without error. The lookup is also needed to populate the selfLink. + return composite.GetBackendService(b.cloud, key, meta.VersionGA) + } + + if backendSvcEqual(expectedBS, bs) { + return bs, nil + } + if bs.ConnectionDraining != nil && bs.ConnectionDraining.DrainingTimeoutSec > 0 { + // if user overrides this value, continue using that. + expectedBS.ConnectionDraining.DrainingTimeoutSec = bs.ConnectionDraining.DrainingTimeoutSec + } + klog.V(2).Infof("EnsureL4BackendService: updating backend service %v", name) + // Set fingerprint for optimistic locking + expectedBS.Fingerprint = bs.Fingerprint + if err := composite.UpdateBackendService(b.cloud, key, expectedBS); err != nil { + return nil, err + } + klog.V(2).Infof("EnsureL4BackendService: updated backend service %v successfully", name) + return composite.GetBackendService(b.cloud, key, meta.VersionGA) +} + +// backendsListEqual asserts that backend lists are equal by group link only +func backendsListEqual(a, b []*composite.Backend) bool { + if len(a) != len(b) { + return false + } + if len(a) == 0 { + return true + } + + aSet := sets.NewString() + for _, v := range a { + aSet.Insert(v.Group) + } + bSet := sets.NewString() + for _, v := range b { + bSet.Insert(v.Group) + } + + return aSet.Equal(bSet) +} + +// backendSvcEqual returns true if the 2 BackendService objects are equal. +// ConnectionDraining timeout is not checked for equality, if user changes +// this timeout and no other backendService parameters change, the backend +// service will not be updated. +func backendSvcEqual(a, b *composite.BackendService) bool { + return a.Protocol == b.Protocol && + a.Description == b.Description && + a.SessionAffinity == b.SessionAffinity && + a.LoadBalancingScheme == b.LoadBalancingScheme && + utils.EqualStringSets(a.HealthChecks, b.HealthChecks) && + backendsListEqual(a.Backends, b.Backends) +} diff --git a/pkg/backends/features/features.go b/pkg/backends/features/features.go index e3c5776cc2..471afc39de 100644 --- a/pkg/backends/features/features.go +++ b/pkg/backends/features/features.go @@ -36,6 +36,8 @@ const ( // FeatureL7ILB defines the feature name of L7 Internal Load Balancer // L7-ILB Resources are currently alpha and regional FeatureL7ILB = "L7ILB" + //FeaturePrimaryVMIPNEG defines the feature name of GCE_PRIMARY_VM_IP NEGs which are used for L4 ILB. + FeaturePrimaryVMIPNEG = "PrimaryVMIPNEG" ) var ( @@ -46,7 +48,7 @@ var ( } // TODO: (shance) refactor all scope to be above the serviceport level scopeToFeatures = map[meta.KeyType][]string{ - meta.Regional: []string{FeatureL7ILB}, + meta.Regional: []string{FeatureL7ILB, FeaturePrimaryVMIPNEG}, } ) @@ -68,6 +70,9 @@ func featuresFromServicePort(sp *utils.ServicePort) []string { if sp.NEGEnabled { features = append(features, FeatureNEG) } + if sp.PrimaryIPNEGEnabled { + features = append(features, FeaturePrimaryVMIPNEG) + } if sp.L7ILBEnabled { features = append(features, FeatureL7ILB) } diff --git a/pkg/backends/syncer.go b/pkg/backends/syncer.go index a7b331bc6e..7885eca0e7 100644 --- a/pkg/backends/syncer.go +++ b/pkg/backends/syncer.go @@ -179,6 +179,10 @@ func (s *backendSyncer) GC(svcPorts []utils.ServicePort) error { // gc deletes the provided backends func (s *backendSyncer) gc(backends []*composite.BackendService, knownPorts sets.String) error { for _, be := range backends { + // Skip L4 LB backend services + if strings.Contains(be.Description, utils.L4ILBServiceDescKey) { + continue + } var key *meta.Key name := be.Name scope, err := composite.ScopeFromSelfLink(be.SelfLink) @@ -191,7 +195,6 @@ func (s *backendSyncer) gc(backends []*composite.BackendService, knownPorts sets if knownPorts.Has(key.String()) { continue } - klog.V(2).Infof("GCing backendService for port %s", name) err = s.backendPool.Delete(name, be.Version, scope) if err != nil { diff --git a/pkg/controller/service_controller.go b/pkg/controller/service_controller.go new file mode 100644 index 0000000000..b89e44823b --- /dev/null +++ b/pkg/controller/service_controller.go @@ -0,0 +1,259 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "fmt" + "reflect" + + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/ingress-gce/pkg/annotations" + "k8s.io/ingress-gce/pkg/backends" + "k8s.io/ingress-gce/pkg/context" + "k8s.io/ingress-gce/pkg/controller/translator" + "k8s.io/ingress-gce/pkg/loadbalancers" + negtypes "k8s.io/ingress-gce/pkg/neg/types" + "k8s.io/ingress-gce/pkg/utils" + "k8s.io/ingress-gce/pkg/utils/common" + "k8s.io/klog" + "k8s.io/kubernetes/pkg/apis/core/v1/helper" +) + +// L4Controller manages the create/update delete of all L4 Internal LoadBalancer services. +type L4Controller struct { + ctx *context.ControllerContext + // kubeClient, needed for attaching finalizer + client kubernetes.Interface + svcQueue utils.TaskQueue + serviceLister cache.Indexer + nodeLister listers.NodeLister + stopCh chan struct{} + // mapping of service key to L4Handler that creates all resources. + handlerMap map[string]*loadbalancers.L4 + // needed for listing the zones in the cluster. + translator *translator.Translator + // needed for linking the NEG with the backend service for each ILB service. + NegLinker backends.Linker + backendPool *backends.Backends +} + +// NewL4Controller creates a new instance of the L4 ILB controller. +func NewL4Controller(ctx *context.ControllerContext, stopCh chan struct{}) *L4Controller { + l4c := &L4Controller{ + ctx: ctx, + client: ctx.KubeClient, + serviceLister: ctx.ServiceInformer.GetIndexer(), + nodeLister: listers.NewNodeLister(ctx.NodeInformer.GetIndexer()), + stopCh: stopCh, + } + l4c.translator = translator.NewTranslator(ctx) + l4c.backendPool = backends.NewPool(ctx.Cloud, ctx.ClusterNamer) + l4c.NegLinker = backends.NewNEGLinker(l4c.backendPool, negtypes.NewAdapter(ctx.Cloud), ctx.Cloud) + + l4c.handlerMap = make(map[string]*loadbalancers.L4) + l4c.svcQueue = utils.NewPeriodicTaskQueue("l4", "services", l4c.sync) + ctx.ServiceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + addSvc := obj.(*v1.Service) + svcKey := utils.ServiceKeyFunc(addSvc.Namespace, addSvc.Name) + if common.HasGivenFinalizer(addSvc.ObjectMeta, common.FinalizerKeyV2) { + // This might be an update or delete showing up as Add since the controller probably exited. + l4c.ctx.Recorder(addSvc.Namespace).Eventf(addSvc, v1.EventTypeNormal, "ADD", svcKey) + l4c.svcQueue.Enqueue(addSvc) + return + } + if needsILB, svcType := annotations.WantsL4ILB(addSvc); !needsILB { + klog.V(4).Infof("Ignoring add for non-lb service %s based on %v", svcKey, svcType) + return + } + klog.V(3).Infof("Service %s added, enqueuing", svcKey) + l4c.ctx.Recorder(addSvc.Namespace).Eventf(addSvc, v1.EventTypeNormal, "ADD", svcKey) + l4c.svcQueue.Enqueue(addSvc) + }, + // Deletes will be handled in the Update when the deletion timestamp is set. + UpdateFunc: func(old, cur interface{}) { + curSvc := cur.(*v1.Service) + svcKey := utils.ServiceKeyFunc(curSvc.Namespace, curSvc.Name) + if common.HasGivenFinalizer(curSvc.ObjectMeta, common.FinalizerKeyV2) { + if reflect.DeepEqual(old, cur) { + // TODO when will this be true? + klog.V(3).Infof("Periodic enqueueing of %v", svcKey) + } else { + klog.V(3).Infof("Service %v changed, enqueuing", svcKey) + } + l4c.svcQueue.Enqueue(curSvc) + } + }, + }) + // TODO enhance this by looking at some metric from service controller to ensure it is up. + // We cannot use existence of a backend service or other resource, since those are on a per-service basis. + ctx.AddHealthCheck("service-controller health", func() error { return nil }) + return l4c +} + +func (l4c *L4Controller) Run() { + defer l4c.shutdown() + go l4c.svcQueue.Run() + <-l4c.stopCh +} + +// This should only be called when the process is being terminated. +func (l4c *L4Controller) shutdown() { + klog.Infof("Shutting down L4 Service Controller") + l4c.svcQueue.Shutdown() +} + +func (l4c *L4Controller) getL4Handler(svcKey string, service *v1.Service) *loadbalancers.L4 { + l4, ok := l4c.handlerMap[svcKey] + if !ok { + l4 = loadbalancers.NewL4Handler(service, l4c.ctx.Cloud, meta.Regional, l4c.ctx.ClusterNamer, l4c.ctx.Recorder(service.Namespace)) + l4c.handlerMap[svcKey] = l4 + } + if service != nil { + l4.Service = service + } + return l4 +} + +// processServiceCreateOrUpdate ensures load balancer resources for the given service, as needed. +// Returns an error if processing the service update failed. +func (l4c *L4Controller) processServiceCreateOrUpdate(key string, service *v1.Service) error { + // skip services that are being handled by the legacy service controller. + if utils.IsLegacyL4ILBService(service) { + klog.Warningf("Ignoring update for service %s:%s managed by service controller", + service.Namespace, service.Name) + l4c.ctx.Recorder(service.Namespace).Eventf(service, v1.EventTypeWarning, "SyncLoadBalancerSkipped", + fmt.Sprintf("skipping l4 load balancer sync as service contains '%s' finalizer", common.LegacyILBFinalizer)) + return nil + } + + // Ensure v2 finalizer + if err := common.EnsureServiceFinalizer(service, common.ILBFinalizerV2, l4c.ctx.KubeClient); err != nil { + return fmt.Errorf("Failed to attach finalizer to service %s/%s, err %v", service.Namespace, service.Name, err) + } + l4 := l4c.getL4Handler(key, service) + nodeNames, err := utils.GetReadyNodeNames(l4c.nodeLister) + if err != nil { + return err + } + // Use the same function for both create and updates. If controller crashes and restarts, + // all existing services will show up as Service Adds. + status, err := l4.EnsureInternalLoadBalancer(nodeNames, service) + if err != nil { + l4c.ctx.Recorder(service.Namespace).Eventf(service, v1.EventTypeWarning, "SyncLoadBalancerFailed", + "Error syncing load balancer: %v", err) + return err + } + if status == nil { + return fmt.Errorf("service status returned by EnsureInternalLoadBalancer is nil") + } + if err = l4c.linkNEG(l4); err != nil { + l4c.ctx.Recorder(service.Namespace).Eventf(service, v1.EventTypeWarning, "SyncLoadBalancerFailed", + "Failed to link NEG with Backend Service for load balancer, err: %v", err) + return err + } + err = l4c.updateServiceStatus(service, status) + if err != nil { + l4c.ctx.Recorder(service.Namespace).Eventf(service, v1.EventTypeWarning, "SyncLoadBalancerFailed", + "Error updating load balancer status: %v", err) + return err + } + return nil +} + +func (l4c *L4Controller) processServiceDeletion(key string, svc *v1.Service) error { + l4 := l4c.getL4Handler(key, svc) + l4c.ctx.Recorder(svc.Namespace).Eventf(svc, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer for %s", key) + if err := l4.EnsureInternalLoadBalancerDeleted(svc); err != nil { + l4c.ctx.Recorder(svc.Namespace).Eventf(svc, v1.EventTypeWarning, "DeleteLoadBalancerFailed", "Error deleting load balancer: %v", err) + return err + } + l4c.ctx.Recorder(svc.Namespace).Eventf(svc, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer") + if err := common.EnsureDeleteServiceFinalizer(svc, common.ILBFinalizerV2, l4c.ctx.KubeClient); err != nil { + l4c.ctx.Recorder(svc.Namespace).Eventf(svc, v1.EventTypeWarning, "DeleteLoadBalancerFailed", + "Error removing finalizer from load balancer: %v", err) + return err + } + // Reset the loadbalancer status, Ignore NotFound error since the service can already be deleted at this point. + if err := l4c.updateServiceStatus(svc, &v1.LoadBalancerStatus{}); utils.IgnoreHTTPNotFound(err) != nil { + l4c.ctx.Recorder(svc.Namespace).Eventf(svc, v1.EventTypeWarning, "DeleteLoadBalancer", + "Error reseting load balancer status to empty: %v", err) + return err + } + return nil +} + +// linkNEG associates the NEG to the backendService for the given L4 ILB service. +func (l4c *L4Controller) linkNEG(l4 *loadbalancers.L4) error { + // link neg to backend service + zones, err := l4c.translator.ListZones() + if err != nil { + return nil + } + var groupKeys []backends.GroupKey + for _, zone := range zones { + groupKeys = append(groupKeys, backends.GroupKey{Zone: zone}) + } + return l4c.NegLinker.Link(l4.ServicePort, groupKeys) +} + +func (l4c *L4Controller) sync(key string) error { + svc, exists, err := l4c.ctx.Services().GetByKey(key) + if err != nil { + return fmt.Errorf("Failed to lookup service for key %s : %s", key, err) + } + if !exists || svc == nil { + // The service will not exist if its resources and finalizer are handled by the legacy service controller and + // it has been deleted. As long as the V2 finalizer is present, the service will not be deleted by apiserver. + klog.V(3).Infof("Ignoring delete of service %s not managed by L4 controller", key) + return nil + } + if common.IsDeletionCandidateForGivenFinalizer(svc.ObjectMeta, common.ILBFinalizerV2) { + klog.V(2).Infof("Deleting ILB resources for service %s managed by L4 controller", key) + return l4c.processServiceDeletion(key, svc) + } + needsILB, svcType := annotations.WantsL4ILB(svc) + if !needsILB { + klog.V(2).Infof("Service %s of type %s does not require L4 ILB, cleaning up resources", key, svcType) + return l4c.processServiceDeletion(key, svc) + } + return l4c.processServiceCreateOrUpdate(key, svc) +} + +// TODO move to patch instead of update +func (l4c *L4Controller) updateServiceStatus(svc *v1.Service, newStatus *v1.LoadBalancerStatus) error { + if helper.LoadBalancerStatusEqual(&svc.Status.LoadBalancer, newStatus) { + return nil + } + svcClient := l4c.ctx.KubeClient.CoreV1().Services(svc.Namespace) + // Get the current service since NEG controller would have modified it by adding annotation + current, err := svcClient.Get(svc.Name, metav1.GetOptions{}) + if err != nil { + return err + } + updated := current.DeepCopy() + updated.Status.LoadBalancer = *newStatus + if _, err := svcClient.UpdateStatus(updated); err != nil { + return err + } + return nil +} diff --git a/pkg/firewalls/firewalls_l4.go b/pkg/firewalls/firewalls_l4.go new file mode 100644 index 0000000000..00d436298a --- /dev/null +++ b/pkg/firewalls/firewalls_l4.go @@ -0,0 +1,99 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package firewalls + +import ( + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" + "google.golang.org/api/compute/v1" + "k8s.io/ingress-gce/pkg/utils" + "k8s.io/klog" + "k8s.io/legacy-cloud-providers/gce" + "strings" +) + +func EnsureL4InternalFirewallRule(cloud *gce.Cloud, fwName, lbIP, svcName string, sourceRanges, portRanges, nodeNames []string, proto string) error { + existingFw, err := cloud.GetFirewall(fwName) + if err != nil && !utils.IsNotFoundError(err) { + return err + } + + nodeTags, err := cloud.GetNodeTags(nodeNames) + if err != nil { + return err + } + fwDesc, err := utils.MakeL4ILBServiceDescription(svcName, lbIP, meta.VersionGA) + if err != nil { + klog.Warningf("EnsureL4InternalFirewallRule: Failed to generate description for rule %s, err: %v", + fwName, err) + } + expectedFw := &compute.Firewall{ + Name: fwName, + Description: fwDesc, + Network: cloud.NetworkURL(), + SourceRanges: sourceRanges, + TargetTags: nodeTags, + Allowed: []*compute.FirewallAllowed{ + { + IPProtocol: strings.ToLower(proto), + Ports: portRanges, + }, + }, + } + if existingFw == nil { + klog.V(2).Infof("EnsureL4InternalFirewallRule(%v): creating firewall", fwName) + err = cloud.CreateFirewall(expectedFw) + if utils.IsForbiddenError(err) && cloud.OnXPN() { + gcloudCmd := gce.FirewallToGCloudCreateCmd(expectedFw, cloud.NetworkProjectID()) + + klog.V(3).Infof("EnsureL4InternalFirewallRule(%v): Could not create L4 firewall on XPN cluster: %v. Raising event for cmd: %q", fwName, err, gcloudCmd) + return newFirewallXPNError(err, gcloudCmd) + } + return err + } + if firewallRuleEqual(expectedFw, existingFw) { + return nil + } + klog.V(2).Infof("EnsureL4InternalFirewallRule(%v): updating firewall", fwName) + err = cloud.UpdateFirewall(expectedFw) + if utils.IsForbiddenError(err) && cloud.OnXPN() { + gcloudCmd := gce.FirewallToGCloudUpdateCmd(expectedFw, cloud.NetworkProjectID()) + klog.V(3).Infof("EnsureL4InternalFirewallRule(%v): Could not update L4 firewall on XPN cluster: %v. Raising event for cmd: %q", fwName, err, gcloudCmd) + return newFirewallXPNError(err, gcloudCmd) + } + return err +} + +func EnsureL4InternalFirewallRuleDeleted(cloud *gce.Cloud, fwName string) error { + if err := utils.IgnoreHTTPNotFound(cloud.DeleteFirewall(fwName)); err != nil { + if utils.IsForbiddenError(err) && cloud.OnXPN() { + gcloudCmd := gce.FirewallToGCloudDeleteCmd(fwName, cloud.NetworkProjectID()) + klog.V(2).Infof("EnsureL4InternalFirewallRuleDeleted(%v): could not delete traffic firewall on XPN cluster. Raising event.", fwName) + return newFirewallXPNError(err, gcloudCmd) + } + return err + } + return nil +} + +func firewallRuleEqual(a, b *compute.Firewall) bool { + return a.Description == b.Description && + len(a.Allowed) == 1 && len(a.Allowed) == len(b.Allowed) && + a.Allowed[0].IPProtocol == b.Allowed[0].IPProtocol && + utils.EqualStringSets(a.Allowed[0].Ports, b.Allowed[0].Ports) && + utils.EqualStringSets(a.SourceRanges, b.SourceRanges) && + utils.EqualStringSets(a.TargetTags, b.TargetTags) +} diff --git a/pkg/healthchecks/healthchecks_l4.go b/pkg/healthchecks/healthchecks_l4.go new file mode 100644 index 0000000000..60dfbb59ae --- /dev/null +++ b/pkg/healthchecks/healthchecks_l4.go @@ -0,0 +1,156 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthchecks + +import ( + "fmt" + cloudprovider "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" + "k8s.io/apimachinery/pkg/types" + "k8s.io/ingress-gce/pkg/composite" + "k8s.io/ingress-gce/pkg/utils" + "k8s.io/klog" + "k8s.io/legacy-cloud-providers/gce" +) + +const ( + + // L4 Load Balancer parameters + gceHcCheckIntervalSeconds = int64(8) + gceHcTimeoutSeconds = int64(1) + // Start sending requests as soon as one healthcheck succeeds. + gceHcHealthyThreshold = int64(1) + // Defaults to 3 * 8 = 24 seconds before the LB will steer traffic away. + gceHcUnhealthyThreshold = int64(3) +) + +// EnsureL4HealthCheck creates a new HTTP health check for an L4 LoadBalancer service, based on the parameters provided. +// If the healthcheck already exists, it is updated as needed. +func EnsureL4HealthCheck(cloud *gce.Cloud, name string, svcName types.NamespacedName, shared bool, path string, port int32) (*composite.HealthCheck, string, error) { + selfLink := "" + key, err := composite.CreateKey(cloud, name, meta.Global) + if err != nil { + return nil, selfLink, fmt.Errorf("Failed to create composite key for healthcheck %s - %v", name, err) + } + hc, err := composite.GetHealthCheck(cloud, key, meta.VersionGA) + if err != nil { + if !utils.IsNotFoundError(err) { + return nil, selfLink, err + } + } + expectedHC := NewL4HealthCheck(name, svcName, shared, path, port) + if hc == nil { + // Create the healthcheck + klog.Infof("Creating healthcheck %s for service %s, shared = %v", name, svcName, shared) + err = composite.CreateHealthCheck(cloud, key, expectedHC) + if err != nil { + return nil, selfLink, err + } + selfLink = cloudprovider.SelfLink(meta.VersionGA, cloud.ProjectID(), "healthChecks", key) + return expectedHC, selfLink, nil + } + selfLink = hc.SelfLink + if !needToUpdateHealthChecks(hc, expectedHC) { + // nothing to do + return hc, selfLink, nil + } + mergeHealthChecks(hc, expectedHC) + klog.Infof("Updating healthcheck %s for service %s", name, svcName) + err = composite.UpdateHealthCheck(cloud, key, expectedHC) + if err != nil { + return nil, selfLink, err + } + return expectedHC, selfLink, err +} + +func DeleteHealthCheck(cloud *gce.Cloud, name string) error { + key, err := composite.CreateKey(cloud, name, meta.Global) + if err != nil { + return fmt.Errorf("Failed to create composite key for healthcheck %s - %v", name, err) + } + return composite.DeleteHealthCheck(cloud, key, meta.VersionGA) +} + +func HealthCheckName(shared bool, clusteruid, lbName string) (string, string) { + hcName := lbName + fwName := lbName + if shared { + hcName = clusteruid + "l4-shared-hc" + fwName = clusteruid + "l4-shared-hc-fw" + } + return hcName, fwName +} + +func NewL4HealthCheck(name string, svcName types.NamespacedName, shared bool, path string, port int32) *composite.HealthCheck { + httpSettings := composite.HTTPHealthCheck{ + Port: int64(port), + RequestPath: path, + } + desc := "" + var err error + + if !shared { + desc, err = utils.MakeL4ILBServiceDescription(svcName.String(), "", meta.VersionGA) + if err != nil { + klog.Warningf("Failed to generate description for L4HealthCheck %s, err %v", name, err) + } + } + return &composite.HealthCheck{ + Name: name, + CheckIntervalSec: gceHcCheckIntervalSeconds, + TimeoutSec: gceHcTimeoutSeconds, + HealthyThreshold: gceHcHealthyThreshold, + UnhealthyThreshold: gceHcUnhealthyThreshold, + HttpHealthCheck: &httpSettings, + Type: "HTTP", + Description: desc, + } +} + +// mergeHealthChecks reconciles HealthCheck configures to be no smaller than +// the default values. +// E.g. old health check interval is 2s, new default is 8. +// The HC interval will be reconciled to 8 seconds. +// If the existing health check is larger than the default interval, +// the configuration will be kept. +func mergeHealthChecks(hc, newHC *composite.HealthCheck) { + if hc.CheckIntervalSec > newHC.CheckIntervalSec { + newHC.CheckIntervalSec = hc.CheckIntervalSec + } + if hc.TimeoutSec > newHC.TimeoutSec { + newHC.TimeoutSec = hc.TimeoutSec + } + if hc.UnhealthyThreshold > newHC.UnhealthyThreshold { + newHC.UnhealthyThreshold = hc.UnhealthyThreshold + } + if hc.HealthyThreshold > newHC.HealthyThreshold { + newHC.HealthyThreshold = hc.HealthyThreshold + } +} + +// needToUpdateHealthChecks checks whether the healthcheck needs to be updated. +func needToUpdateHealthChecks(hc, newHC *composite.HealthCheck) bool { + return hc.HttpHealthCheck == nil || + newHC.HttpHealthCheck == nil || + hc.HttpHealthCheck.Port != newHC.HttpHealthCheck.Port || + hc.HttpHealthCheck.RequestPath != newHC.HttpHealthCheck.RequestPath || + hc.Description != newHC.Description || + hc.CheckIntervalSec < newHC.CheckIntervalSec || + hc.TimeoutSec < newHC.TimeoutSec || + hc.UnhealthyThreshold < newHC.UnhealthyThreshold || + hc.HealthyThreshold < newHC.HealthyThreshold +} diff --git a/pkg/loadbalancers/forwarding_rules.go b/pkg/loadbalancers/forwarding_rules.go index 959f62c1bf..e3ea926311 100644 --- a/pkg/loadbalancers/forwarding_rules.go +++ b/pkg/loadbalancers/forwarding_rules.go @@ -19,12 +19,17 @@ package loadbalancers import ( "fmt" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/ingress-gce/pkg/annotations" "k8s.io/ingress-gce/pkg/composite" "k8s.io/ingress-gce/pkg/flags" "k8s.io/ingress-gce/pkg/utils" "k8s.io/ingress-gce/pkg/utils/namer" "k8s.io/klog" + "k8s.io/legacy-cloud-providers/gce" ) const ( @@ -181,3 +186,168 @@ func (l *L7) getEffectiveIP() (string, bool) { } return "", true } + +// getForwardingRule with the given name +// rule with the given name exists with the correct parameters and creates a forwarding rule if +// it does not exist already. If shouldExist parameter is true, this function returns if forwarding rule is not found. +func (l *L4) ensureForwardingRule(loadBalancerName, bsLink string, options gce.ILBOptions) (*composite.ForwardingRule, error) { + key, err := l.CreateKey(loadBalancerName) + if err != nil { + return nil, err + } + desc := utils.L4ILBResourceDescription{} + // version used for creating the existing forwarding rule. + existingVersion := meta.VersionGA + // version to use for the new forwarding rule + newVersion := getAPIVersion(options) + + // Get the GA version forwarding rule, use the description to identify the version it was created with. + existingFwdRule, err := composite.GetForwardingRule(l.cloud, key, meta.VersionGA) + if utils.IgnoreHTTPNotFound(err) != nil { + return nil, err + } + if existingFwdRule != nil { + if err = desc.Unmarshal(existingFwdRule.Description); err != nil { + klog.Warningf("Failed to lookup forwarding rule version from description, err %v. Using GA Version.", err) + } else { + existingVersion = desc.APIVersion + } + } + // Fetch the right forwarding rule in case it is not using GA + if existingVersion != meta.VersionGA { + existingFwdRule, err = composite.GetForwardingRule(l.cloud, key, existingVersion) + if utils.IgnoreHTTPNotFound(err) != nil { + klog.Errorf("Failed to lookup forwarding rule '%s' at version - %s, err %v", key.Name, existingVersion, err) + return nil, err + } + } + + if l.cloud.IsLegacyNetwork() { + l.recorder.Event(l.Service, v1.EventTypeWarning, "ILBOptionsIgnored", "Internal LoadBalancer options are not supported with Legacy Networks.") + options = gce.ILBOptions{} + } + subnetworkURL := l.cloud.SubnetworkURL() + + if !l.cloud.AlphaFeatureGate.Enabled(gce.AlphaFeatureILBCustomSubnet) { + if options.SubnetName != "" { + l.recorder.Event(l.Service, v1.EventTypeWarning, "ILBCustomSubnetOptionIgnored", "Internal LoadBalancer CustomSubnet options ignored as the feature gate is disabled.") + options.SubnetName = "" + } + } + if l.cloud.AlphaFeatureGate.Enabled(gce.AlphaFeatureILBCustomSubnet) { + // If this feature is enabled, changes to subnet annotation will be + // picked up and reflected in the forwarding rule. + // Removing the annotation will set the forwarding rule to use the default subnet. + if options.SubnetName != "" { + subnetKey := *key + subnetKey.Name = options.SubnetName + subnetworkURL = cloud.SelfLink(meta.VersionGA, l.cloud.ProjectID(), "subnetworks", &subnetKey) + } + } else { + // TODO(84885) remove this once ILBCustomSubnet goes beta. + if existingFwdRule != nil && existingFwdRule.Subnetwork != "" { + // If the ILB already exists, continue using the subnet that it's already using. + // This is to support existing ILBs that were setup using the wrong subnet - https://github.com/kubernetes/kubernetes/pull/57861 + subnetworkURL = existingFwdRule.Subnetwork + } + } + // Determine IP which will be used for this LB. If no forwarding rule has been established + // or specified in the Service spec, then requestedIP = "". + ipToUse := ilbIPToUse(l.Service, existingFwdRule, subnetworkURL) + klog.V(2).Infof("ensureForwardingRule(%v): Using subnet %s for LoadBalancer IP %s", loadBalancerName, options.SubnetName, ipToUse) + + var addrMgr *addressManager + // If the network is not a legacy network, use the address manager + if !l.cloud.IsLegacyNetwork() { + nm := types.NamespacedName{Namespace: l.Service.Namespace, Name: l.Service.Name}.String() + addrMgr = newAddressManager(l.cloud, nm, l.cloud.Region(), subnetworkURL, loadBalancerName, ipToUse, cloud.SchemeInternal) + ipToUse, err = addrMgr.HoldAddress() + if err != nil { + return nil, err + } + klog.V(2).Infof("ensureForwardingRule(%v): reserved IP %q for the forwarding rule", loadBalancerName, ipToUse) + } + + ports, _, protocol := utils.GetPortsAndProtocol(l.Service.Spec.Ports) + // add firewall rule before the forwarding rule + + // Create the forwarding rule + frDesc, err := utils.MakeL4ILBServiceDescription(utils.ServiceKeyFunc(l.Service.Namespace, l.Service.Name), ipToUse, + newVersion) + if err != nil { + return nil, fmt.Errorf("Failed to compute description for forwarding rule %s, err: %v", loadBalancerName, + err) + } + + fr := &composite.ForwardingRule{ + Name: loadBalancerName, + IPAddress: ipToUse, + Ports: ports, + IPProtocol: string(protocol), + LoadBalancingScheme: string(cloud.SchemeInternal), + Subnetwork: subnetworkURL, + Network: l.cloud.NetworkURL(), + Version: newVersion, + BackendService: bsLink, + AllowGlobalAccess: options.AllowGlobalAccess, + Description: frDesc, + } + + if existingFwdRule != nil { + if Equal(existingFwdRule, fr) { + // nothing to do + klog.V(2).Infof("ensureForwardingRule: Skipping update of unchanged forwarding rule - %s", fr.Name) + return existingFwdRule, nil + } else { + klog.V(2).Infof("ensureForwardingRule: Recreating forwarding rule - %s", fr.Name) + if err = utils.IgnoreHTTPNotFound(composite.DeleteForwardingRule(l.cloud, key, existingVersion)); err != nil { + return nil, err + } + } + } + if err = composite.CreateForwardingRule(l.cloud, key, fr); err != nil { + return nil, err + } + + return composite.GetForwardingRule(l.cloud, key, fr.Version) +} + +func Equal(fr1, fr2 *composite.ForwardingRule) bool { + // If one of the IP addresses is empty, do not consider it as an inequality. + // If the IP address drops from a valid IP to empty, we do not want to apply + // the change if it is the only change in the forwarding rule. Similarly, if + // the forwarding rule changes from an empty IP to an allocated IP address, the + // subnetwork will change as well. + return (fr1.IPAddress == "" || fr2.IPAddress == "" || fr1.IPAddress == fr2.IPAddress) && + fr1.IPProtocol == fr2.IPProtocol && + fr1.LoadBalancingScheme == fr2.LoadBalancingScheme && + utils.EqualStringSets(fr1.Ports, fr2.Ports) && + fr1.BackendService == fr2.BackendService && + fr1.AllowGlobalAccess == fr2.AllowGlobalAccess && + fr1.Subnetwork == fr2.Subnetwork +} + +// ilbIPToUse determines which IP address needs to be used in the ForwardingRule. If an IP has been +// specified by the user, that is used. If there is an existing ForwardingRule, the ip address from +// that is reused. In case a subnetwork change is requested, the existing ForwardingRule IP is ignored. +func ilbIPToUse(svc *v1.Service, fwdRule *composite.ForwardingRule, requestedSubnet string) string { + if svc.Spec.LoadBalancerIP != "" { + return svc.Spec.LoadBalancerIP + } + if fwdRule == nil { + return "" + } + if requestedSubnet != fwdRule.Subnetwork { + // reset ip address since subnet is being changed. + return "" + } + return fwdRule.IPAddress +} + +// getAPIVersion returns the API version to use for CRUD of Forwarding rules, given the options enabled. +func getAPIVersion(options gce.ILBOptions) meta.Version { + if options.AllowGlobalAccess { + return meta.VersionBeta + } + return meta.VersionGA +} diff --git a/pkg/loadbalancers/l4.go b/pkg/loadbalancers/l4.go new file mode 100644 index 0000000000..ad3cfe6aa5 --- /dev/null +++ b/pkg/loadbalancers/l4.go @@ -0,0 +1,196 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package loadbalancers + +import ( + "strconv" + + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + "k8s.io/cloud-provider/service/helpers" + "k8s.io/ingress-gce/pkg/backends" + "k8s.io/ingress-gce/pkg/composite" + "k8s.io/ingress-gce/pkg/firewalls" + "k8s.io/ingress-gce/pkg/healthchecks" + "k8s.io/ingress-gce/pkg/utils" + "k8s.io/ingress-gce/pkg/utils/namer" + "k8s.io/klog" + "k8s.io/legacy-cloud-providers/gce" +) + +// Many of the functions in this file are re-implemented from gce_loadbalancer_internal.go +// L4 handles the resource creation/deletion/update for a given L4 ILB service. +type L4 struct { + cloud *gce.Cloud + backendPool *backends.Backends + scope meta.KeyType + namer *namer.Namer + // recorder is used to generate k8s Events. + recorder record.EventRecorder + Service *corev1.Service + ServicePort utils.ServicePort + NamespacedName types.NamespacedName +} + +// NewL4Handler creates a new L4Handler for the given L4 service. +func NewL4Handler(service *corev1.Service, cloud *gce.Cloud, scope meta.KeyType, namer *namer.Namer, recorder record.EventRecorder) *L4 { + l := &L4{cloud: cloud, scope: scope, namer: namer, recorder: recorder, Service: service} + l.NamespacedName = types.NamespacedName{Name: service.Name, Namespace: service.Namespace} + l.backendPool = backends.NewPool(l.cloud, l.namer) + l.ServicePort = utils.ServicePort{ID: utils.ServicePortID{Service: l.NamespacedName}, BackendNamer: l.namer, + PrimaryIPNEGEnabled: true} + return l +} + +// CreateKey generates a meta.Key for a given GCE resource name. +func (l *L4) CreateKey(name string) (*meta.Key, error) { + return composite.CreateKey(l.cloud, name, l.scope) +} + +// getILBOptions fetches the optional features requested on the given ILB service. +func getILBOptions(svc *corev1.Service) gce.ILBOptions { + return gce.ILBOptions{AllowGlobalAccess: gce.GetLoadBalancerAnnotationAllowGlobalAccess(svc), + SubnetName: gce.GetLoadBalancerAnnotationSubnet(svc)} + +} + +// EnsureInternalLoadBalancerDeleted performs a cleanup of all GCE resources for the given loadbalancer service. +func (l *L4) EnsureInternalLoadBalancerDeleted(svc *corev1.Service) error { + sharedHC := !helpers.RequestsOnlyLocalTraffic(svc) + name := l.namer.PrimaryIPNEG(svc.Namespace, svc.Name) + key, err := l.CreateKey(name) + if err != nil { + klog.Errorf("Failed to delete loadbalancer resources with name %s for service %s", name, l.NamespacedName.String()) + return err + } + err = utils.IgnoreHTTPNotFound(composite.DeleteForwardingRule(l.cloud, key, getAPIVersion(getILBOptions(l.Service)))) + if err != nil { + klog.Errorf("Failed to delete forwarding rule for internal loadbalancer %s - %v", l.NamespacedName.String(), err) + return err + } + hcName, hcFwName := healthchecks.HealthCheckName(sharedHC, l.namer.UID(), name) + // delete fw rules + deleteFunc := func(name string) error { + err := firewalls.EnsureL4InternalFirewallRuleDeleted(l.cloud, name) + if err != nil { + if fwErr, ok := err.(*firewalls.FirewallXPNError); ok { + l.recorder.Eventf(l.Service, corev1.EventTypeNormal, "XPN", fwErr.Message) + return nil + } + return err + } + return nil + } + err = deleteFunc(name) + if err != nil { + return err + } + + err = deleteFunc(hcFwName) + if err != nil { + return err + } + // delete backend service + err = utils.IgnoreHTTPNotFound(l.backendPool.Delete(name, meta.VersionGA, meta.Regional)) + if err != nil { + klog.Errorf("Failed to delete backends for internal loadbalancer %s - %v", l.NamespacedName.String(), err) + return err + } + + // Delete healthcheck + err = utils.IgnoreHTTPNotFound(healthchecks.DeleteHealthCheck(l.cloud, hcName)) + if err != nil { + // This error will be hit if this is a shared healthcheck. + if utils.IsInUsedByError(err) { + klog.V(2).Infof("Failed to delete healthcheck %s: health check in use.", hcName) + return nil + } + klog.Errorf("Failed to delete healthcheck for internal loadbalancer %s, err %v", l.NamespacedName.String(), err) + return err + } + return nil +} + +// EnsureInternalLoadBalancer ensures that all GCE resources for the given loadbalancer service have +// been created. It returns a LoadBalancerStatus with the updated ForwardingRule IP address. +func (l *L4) EnsureInternalLoadBalancer(nodeNames []string, svc *corev1.Service) (*corev1.LoadBalancerStatus, error) { + // Lookup existing forwarding rule and make sure it exists. + // Use the same resource name for NEG, BackendService as well as FR, FWRule. + l.Service = svc + name := l.namer.PrimaryIPNEG(l.Service.Namespace, l.Service.Name) + options := getILBOptions(l.Service) + + // create healthcheck + sharedHC := !helpers.RequestsOnlyLocalTraffic(l.Service) + hcName, hcFwName := healthchecks.HealthCheckName(sharedHC, l.namer.UID(), name) + hcPath, hcPort := gce.GetNodesHealthCheckPath(), gce.GetNodesHealthCheckPort() + if !sharedHC { + hcPath, hcPort = helpers.GetServiceHealthCheckPathPort(l.Service) + } + _, hcLink, err := healthchecks.EnsureL4HealthCheck(l.cloud, hcName, l.NamespacedName, sharedHC, hcPath, hcPort) + if err != nil { + return nil, err + } + + _, portRanges, protocol := utils.GetPortsAndProtocol(l.Service.Spec.Ports) + + // ensure firewalls + sourceRanges, err := helpers.GetLoadBalancerSourceRanges(l.Service) + if err != nil { + return nil, err + } + hcSourceRanges := gce.L4LoadBalancerSrcRanges() + ensureFunc := func(name, IP string, sourceRanges, portRanges []string, proto string) error { + err := firewalls.EnsureL4InternalFirewallRule(l.cloud, name, IP, l.Service.Name, sourceRanges, portRanges, nodeNames, proto) + if err != nil { + if fwErr, ok := err.(*firewalls.FirewallXPNError); ok { + l.recorder.Eventf(l.Service, corev1.EventTypeNormal, "XPN", fwErr.Message) + return nil + } + return err + } + return nil + } + // Add firewall rule for ILB traffic to nodes + err = ensureFunc(name, "", sourceRanges.StringSlice(), portRanges, string(protocol)) + if err != nil { + return nil, err + } + + // Add firewall rule for healthchecks to nodes + err = ensureFunc(hcFwName, "", hcSourceRanges, []string{strconv.Itoa(int(hcPort))}, string(corev1.ProtocolTCP)) + if err != nil { + return nil, err + } + // ensure backend service + bs, err := l.backendPool.EnsureL4BackendService(name, hcLink, string(protocol), string(l.Service.Spec.SessionAffinity), + string(cloud.SchemeInternal), l.NamespacedName, meta.VersionGA) + if err != nil { + return nil, err + } + + // create fr rule + fr, err := l.ensureForwardingRule(name, bs.SelfLink, options) + if err != nil { + klog.Errorf("EnsureInternalLoadBalancer: Failed to create forwarding rule - %v", err) + return nil, err + } + return &corev1.LoadBalancerStatus{Ingress: []corev1.LoadBalancerIngress{{IP: fr.IPAddress}}}, nil +} diff --git a/pkg/utils/common/finalizer.go b/pkg/utils/common/finalizer.go index 4c23506baf..de07803295 100644 --- a/pkg/utils/common/finalizer.go +++ b/pkg/utils/common/finalizer.go @@ -16,8 +16,10 @@ package common import ( "fmt" + v1 "k8s.io/api/core/v1" "k8s.io/api/networking/v1beta1" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" client "k8s.io/client-go/kubernetes/typed/networking/v1beta1" "k8s.io/klog" "k8s.io/kubernetes/pkg/util/slice" @@ -29,11 +31,11 @@ const ( // FinalizerKeyV2 is the string representing the Ingress finalizer version. // Ingress with V2 finalizer uses V2 frontend naming scheme. FinalizerKeyV2 = "networking.gke.io/ingress-finalizer-V2" - // FinalizerKeyL4 is the string representing the L4 ILB controller finalizer in this repo. - FinalizerKeyL4 = "networking.gke.io/l4-ilb-v2" - // FinalizerKeyL4V1 is the string representing the service controller finalizer. A service with this finalizer - // is managed by k/k service controller. - FinalizerKeyL4V1 = "networking.gke.io/l4-ilb-v1" + // TODO remove the 2 definitions once they are added in legacy-cloud-providers/gce + // LegacyILBFinalizer key is used to identify ILB services whose resources are managed by service controller. + LegacyILBFinalizer = "gke.networking.io/l4-ilb-v1" + // ILBFinalizerV2 is the finalizer used by newer controllers that implement Internal LoadBalancer services. + ILBFinalizerV2 = "gke.networking.io/l4-ilb-v2" ) // IsDeletionCandidate is true if the passed in meta contains an ingress finalizer. @@ -88,3 +90,33 @@ func EnsureDeleteFinalizer(ing *v1beta1.Ingress, ingClient client.IngressInterfa } return nil } + +// EnsureServiceFinalizer patches the service to add finalizer. +func EnsureServiceFinalizer(service *v1.Service, key string, kubeClient kubernetes.Interface) error { + if HasGivenFinalizer(service.ObjectMeta, key) { + return nil + } + + // Make a copy so we don't mutate the shared informer cache. + updated := service.DeepCopy() + updated.ObjectMeta.Finalizers = append(updated.ObjectMeta.Finalizers, key) + + klog.V(2).Infof("Adding finalizer %s to service %s/%s", key, updated.Namespace, updated.Name) + _, err := kubeClient.CoreV1().Services(updated.Namespace).Update(updated) + return err +} + +// removeFinalizer patches the service to remove finalizer. +func EnsureDeleteServiceFinalizer(service *v1.Service, key string, kubeClient kubernetes.Interface) error { + if !HasGivenFinalizer(service.ObjectMeta, key) { + return nil + } + + // Make a copy so we don't mutate the shared informer cache. + updated := service.DeepCopy() + updated.ObjectMeta.Finalizers = slice.RemoveString(updated.ObjectMeta.Finalizers, key, nil) + + klog.V(2).Infof("Removing finalizer from service %s/%s", updated.Namespace, updated.Name) + _, err := kubeClient.CoreV1().Services(updated.Namespace).Update(updated) + return err +} diff --git a/pkg/utils/namer/interfaces.go b/pkg/utils/namer/interfaces.go index 41160b6d09..8cd431714c 100644 --- a/pkg/utils/namer/interfaces.go +++ b/pkg/utils/namer/interfaces.go @@ -55,6 +55,8 @@ type BackendNamer interface { // NEG returns the gce neg name based on the service namespace, name // and target port. NEG(namespace, name string, Port int32) string + // PrimaryIPNEG returns the gce neg name based on the service namespace and name + PrimaryIPNEG(namespace, name string) string // InstanceGroup constructs the name for an Instance Group. InstanceGroup() string // NamedPort returns the name for a named port. diff --git a/pkg/utils/namer/namer.go b/pkg/utils/namer/namer.go index baa6221676..78be3a6e98 100644 --- a/pkg/utils/namer/namer.go +++ b/pkg/utils/namer/namer.go @@ -24,6 +24,8 @@ import ( "strings" "sync" + "k8s.io/ingress-gce/pkg/utils/common" + "k8s.io/klog" ) @@ -479,7 +481,8 @@ func (n *Namer) PrimaryIPNEG(namespace, name string) string { truncNamespace := truncFields[0] truncName := truncFields[1] // Use the full cluster UID in the suffix to reduce chance of collision. - return fmt.Sprintf("%s-%s-%s-%s", n.negPrefix(), truncNamespace, truncName, negSuffix(n.UID(), namespace, name, "", "")) + return fmt.Sprintf("%s-%s-%s-%s", n.negPrefix(), truncNamespace, truncName, + primaryIPNegSuffix(n.UID(), namespace, name)) } // IsNEG returns true if the name is a NEG owned by this cluster. @@ -494,6 +497,11 @@ func (n *Namer) negPrefix() string { return fmt.Sprintf("%s%s-%s", n.prefix, schemaVersionV1, n.shortUID()) } +// primaryIPNegSuffix returns an 8 character hash code to be used as suffix in GCE_PRIMARY_VM_IP NEG. +func primaryIPNegSuffix(uid, namespace, name string) string { + return common.ContentHash(strings.Join([]string{uid, namespace, name}, ";"), 8) +} + // negSuffix returns hash code with 8 characters func negSuffix(uid, namespace, name, port, subset string) string { negString := strings.Join([]string{uid, namespace, name, port}, ";") diff --git a/pkg/utils/serviceport.go b/pkg/utils/serviceport.go index 842538dd15..eb7bda09ca 100644 --- a/pkg/utils/serviceport.go +++ b/pkg/utils/serviceport.go @@ -46,20 +46,21 @@ type ServicePort struct { NodePort int64 // Numerical port of the Service, retrieved from the Service - Port int32 - Protocol annotations.AppProtocol - TargetPort string - NEGEnabled bool - L7ILBEnabled bool - BackendConfig *backendconfigv1beta1.BackendConfig - BackendNamer namer.BackendNamer + Port int32 + Protocol annotations.AppProtocol + TargetPort string + NEGEnabled bool + PrimaryIPNEGEnabled bool + L7ILBEnabled bool + BackendConfig *backendconfigv1beta1.BackendConfig + BackendNamer namer.BackendNamer } // GetAPIVersionFromServicePort returns the compute API version to be used // for creating NEGs associated with the given ServicePort. func GetAPIVersionFromServicePort(sp *ServicePort) meta.Version { - if sp == nil { - // this uses GCE_VM_PRIMARY_IP NEGS which requires alpha API + if sp.PrimaryIPNEGEnabled { + // this uses VM_PRIMARY_IP_NEGS which requires alpha API return meta.VersionAlpha } return meta.VersionGA @@ -77,6 +78,8 @@ func (sp ServicePort) GetDescription() Description { func (sp ServicePort) BackendName() string { if sp.NEGEnabled { return sp.BackendNamer.NEG(sp.ID.Service.Namespace, sp.ID.Service.Name, sp.Port) + } else if sp.PrimaryIPNEGEnabled { + return sp.BackendNamer.PrimaryIPNEG(sp.ID.Service.Namespace, sp.ID.Service.Name) } return sp.BackendNamer.IGBackend(sp.NodePort) } diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 62f2bf28ac..9bf22d9203 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -22,14 +22,20 @@ import ( "errors" "fmt" "net/http" + "sort" + "strconv" "strings" + "k8s.io/kubernetes/pkg/util/slice" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" "google.golang.org/api/compute/v1" "google.golang.org/api/googleapi" api_v1 "k8s.io/api/core/v1" "k8s.io/api/networking/v1beta1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/ingress-gce/pkg/annotations" @@ -70,7 +76,8 @@ const ( LabelNodeRoleExcludeBalancer = "alpha.service-controller.kubernetes.io/exclude-balancer" // ToBeDeletedTaint is the taint that the autoscaler adds when a node is scheduled to be deleted // https://github.com/kubernetes/autoscaler/blob/cluster-autoscaler-0.5.2/cluster-autoscaler/utils/deletetaint/delete.go#L33 - ToBeDeletedTaint = "ToBeDeletedByClusterAutoscaler" + ToBeDeletedTaint = "ToBeDeletedByClusterAutoscaler" + L4ILBServiceDescKey = "networking.gke.io/service-name" ) // FrontendGCAlgorithm species GC algorithm used for ingress frontend resources. @@ -85,6 +92,10 @@ const ( // CleanupV2FrontendResources specifies that frontend resources for ingresses // that use v2 naming scheme need to be deleted. CleanupV2FrontendResources + // AffinityTypeNone - no session affinity. + gceAffinityTypeNone = "NONE" + // AffinityTypeClientIP - affinity based on Client IP. + gceAffinityTypeClientIP = "CLIENT_IP" ) // FakeGoogleAPIForbiddenErr creates a Forbidden error with type googleapi.Error @@ -429,13 +440,118 @@ func NumEndpoints(ep *api_v1.Endpoints) (result int) { return result } +// EqualStringSets returns true if 2 given string slices contain the same elements, in any order. +func EqualStringSets(x, y []string) bool { + if len(x) != len(y) { + return false + } + xString := sets.NewString(x...) + yString := sets.NewString(y...) + return xString.Equal(yString) +} + +// GetPortRanges returns a list of port ranges, given a list of ports. +func GetPortRanges(ports []int) (ranges []string) { + if len(ports) < 1 { + return ranges + } + sort.Ints(ports) + + start := ports[0] + prev := ports[0] + for ix, current := range ports { + switch { + case current == prev: + // Loop over duplicates, except if the end of list is reached. + if ix == len(ports)-1 { + if start == current { + ranges = append(ranges, fmt.Sprintf("%d", current)) + } else { + ranges = append(ranges, fmt.Sprintf("%d-%d", start, current)) + } + } + case current == prev+1: + // continue the streak, create the range if this is the last element in the list. + if ix == len(ports)-1 { + ranges = append(ranges, fmt.Sprintf("%d-%d", start, current)) + } + default: + // current is not prev + 1, streak is broken. Construct the range and handle last element case. + if start == prev { + ranges = append(ranges, fmt.Sprintf("%d", prev)) + } else { + ranges = append(ranges, fmt.Sprintf("%d-%d", start, prev)) + } + if ix == len(ports)-1 { + ranges = append(ranges, fmt.Sprintf("%d", current)) + } + // reset start element + start = current + } + prev = current + } + return ranges +} + +// GetPortsAndProtocol returns the list of ports, list of port ranges and the protocol given the list of k8s port info. +func GetPortsAndProtocol(svcPorts []api_v1.ServicePort) (ports []string, portRanges []string, protocol api_v1.Protocol) { + if len(svcPorts) == 0 { + return []string{}, []string{}, api_v1.ProtocolUDP + } + + // GCP doesn't support multiple protocols for a single load balancer + protocol = svcPorts[0].Protocol + portInts := []int{} + for _, p := range svcPorts { + ports = append(ports, strconv.Itoa(int(p.Port))) + portInts = append(portInts, int(p.Port)) + } + + return ports, GetPortRanges(portInts), protocol +} + +// TranslateAffinityType converts the k8s affinity type to the GCE affinity type. +func TranslateAffinityType(affinityType string) string { + switch affinityType { + case string(api_v1.ServiceAffinityClientIP): + return gceAffinityTypeClientIP + case string(api_v1.ServiceAffinityNone): + return gceAffinityTypeNone + default: + klog.Errorf("Unexpected affinity type: %v", affinityType) + return gceAffinityTypeNone + } +} + // IsLegacyL4ILBService returns true if the given LoadBalancer service is managed by service controller. func IsLegacyL4ILBService(svc *api_v1.Service) bool { - for _, key := range svc.ObjectMeta.Finalizers { - if key == common.FinalizerKeyL4V1 { - // service has v1 finalizer, this is handled by service controller code. - return true - } + return slice.ContainsString(svc.ObjectMeta.Finalizers, common.LegacyILBFinalizer, nil) +} + +// L4ILBResourceDescription stores the description fields for L4 ILB resources. +// This is useful to indetify which resources correspond to which L4 ILB service. +type L4ILBResourceDescription struct { + // ServiceName indicates the name of the service the resource is for. + ServiceName string `json:"networking.gke.io/service-name"` + // APIVersion stores the version og the compute API used to create this resource. + APIVersion meta.Version `json:"networking.gke.io/api-version,omitempty"` + ServiceIP string `json:"networking.gke.io/service-ip,omitempty"` +} + +// Marshal returns the description as a JSON-encoded string. +func (d *L4ILBResourceDescription) Marshal() (string, error) { + out, err := json.Marshal(d) + if err != nil { + return "", err } - return false + return string(out), err +} + +// Unmarshal converts the JSON-encoded description string into the struct. +func (d *L4ILBResourceDescription) Unmarshal(desc string) error { + return json.Unmarshal([]byte(desc), d) +} + +func MakeL4ILBServiceDescription(svcName, ip string, version meta.Version) (string, error) { + return (&L4ILBResourceDescription{ServiceName: svcName, ServiceIP: ip, APIVersion: version}).Marshal() } diff --git a/pkg/utils/utils_test.go b/pkg/utils/utils_test.go index 6541e6fca7..583e4af086 100644 --- a/pkg/utils/utils_test.go +++ b/pkg/utils/utils_test.go @@ -21,7 +21,7 @@ import ( "testing" "time" - "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/ingress-gce/pkg/annotations" "k8s.io/ingress-gce/pkg/flags" "k8s.io/ingress-gce/pkg/utils/common" @@ -762,7 +762,7 @@ func TestIsLegacyL4ILBService(t *testing.T) { Name: "testsvc", Namespace: "default", Annotations: map[string]string{gce.ServiceAnnotationLoadBalancerType: string(gce.LBTypeInternal)}, - Finalizers: []string{common.FinalizerKeyL4V1}, + Finalizers: []string{common.LegacyILBFinalizer}, }, Spec: api_v1.ServiceSpec{ Type: api_v1.ServiceTypeLoadBalancer,