diff --git a/cmd/glbc/main.go b/cmd/glbc/main.go index 88e8a91bc4..0e57f463fa 100644 --- a/cmd/glbc/main.go +++ b/cmd/glbc/main.go @@ -218,6 +218,12 @@ func runControllers(ctx *ingctx.ControllerContext) { fwc := firewalls.NewFirewallController(ctx, flags.F.NodePortRanges.Values()) + if flags.F.RunL4Controller { + l4Controller := controller.NewL4Controller(ctx, stopCh) + go l4Controller.Run() + klog.V(0).Infof("L4 controller started") + } + // TODO: Refactor NEG to use cloud mocks so ctx.Cloud can be referenced within NewController. negController := neg.NewController(negtypes.NewAdapter(ctx.Cloud), ctx, lbc.Translator, ctx.ClusterNamer, flags.F.ResyncPeriod, flags.F.NegGCPeriod, flags.F.EnableReadinessReflector, flags.F.RunIngressController, flags.F.RunL4Controller) diff --git a/pkg/backends/backends.go b/pkg/backends/backends.go index f8687cdca8..aaced1c350 100644 --- a/pkg/backends/backends.go +++ b/pkg/backends/backends.go @@ -19,6 +19,8 @@ import ( "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" "google.golang.org/api/compute/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/ingress-gce/pkg/backends/features" "k8s.io/ingress-gce/pkg/composite" "k8s.io/ingress-gce/pkg/utils" @@ -27,6 +29,10 @@ import ( "k8s.io/legacy-cloud-providers/gce" ) +const ( + DefaultConnectionDrainingTimeoutSeconds = 30 +) + // Backends handles CRUD operations for backends. type Backends struct { cloud *gce.Cloud @@ -224,3 +230,91 @@ func (b *Backends) List(key *meta.Key, version meta.Version) ([]*composite.Backe } return clusterBackends, nil } + +// EnsureL4BackendService creates or updates the backend service with the given name. +func (b *Backends) EnsureL4BackendService(name, hcLink, protocol, sessionAffinity, scheme string, nm types.NamespacedName, version meta.Version) (*composite.BackendService, error) { + klog.V(2).Infof("EnsureL4BackendService(%v, %v, %v): checking existing backend service", name, scheme, protocol) + key, err := composite.CreateKey(b.cloud, name, meta.Regional) + bs, err := composite.GetBackendService(b.cloud, key, meta.VersionGA) + if err != nil && !utils.IsNotFoundError(err) { + return nil, err + } + desc, err := utils.MakeL4ILBServiceDescription(nm.String(), "", meta.VersionGA) + if err != nil { + klog.Warningf("EnsureL4BackendService: Failed to generate description for BackendService %s, err %v", + name, err) + } + expectedBS := &composite.BackendService{ + Name: name, + Protocol: string(protocol), + Description: desc, + HealthChecks: []string{hcLink}, + SessionAffinity: utils.TranslateAffinityType(sessionAffinity), + LoadBalancingScheme: string(scheme), + ConnectionDraining: &composite.ConnectionDraining{DrainingTimeoutSec: DefaultConnectionDrainingTimeoutSeconds}, + } + + // Create backend service if none was found + if bs == nil { + klog.V(2).Infof("EnsureL4BackendService: creating backend service %v", name) + err := composite.CreateBackendService(b.cloud, key, expectedBS) + if err != nil { + return nil, err + } + klog.V(2).Infof("EnsureL4BackendService: created backend service %v successfully", name) + // We need to perform a GCE call to re-fetch the object we just created + // so that the "Fingerprint" field is filled in. This is needed to update the + // object without error. The lookup is also needed to populate the selfLink. + return composite.GetBackendService(b.cloud, key, meta.VersionGA) + } + + if backendSvcEqual(expectedBS, bs) { + return bs, nil + } + if bs.ConnectionDraining != nil && bs.ConnectionDraining.DrainingTimeoutSec > 0 { + // if user overrides this value, continue using that. + expectedBS.ConnectionDraining.DrainingTimeoutSec = bs.ConnectionDraining.DrainingTimeoutSec + } + klog.V(2).Infof("EnsureL4BackendService: updating backend service %v", name) + // Set fingerprint for optimistic locking + expectedBS.Fingerprint = bs.Fingerprint + if err := composite.UpdateBackendService(b.cloud, key, expectedBS); err != nil { + return nil, err + } + klog.V(2).Infof("EnsureL4BackendService: updated backend service %v successfully", name) + return composite.GetBackendService(b.cloud, key, meta.VersionGA) +} + +// backendsListEqual asserts that backend lists are equal by group link only +func backendsListEqual(a, b []*composite.Backend) bool { + if len(a) != len(b) { + return false + } + if len(a) == 0 { + return true + } + + aSet := sets.NewString() + for _, v := range a { + aSet.Insert(v.Group) + } + bSet := sets.NewString() + for _, v := range b { + bSet.Insert(v.Group) + } + + return aSet.Equal(bSet) +} + +// backendSvcEqual returns true if the 2 BackendService objects are equal. +// ConnectionDraining timeout is not checked for equality, if user changes +// this timeout and no other backendService parameters change, the backend +// service will not be updated. +func backendSvcEqual(a, b *composite.BackendService) bool { + return a.Protocol == b.Protocol && + a.Description == b.Description && + a.SessionAffinity == b.SessionAffinity && + a.LoadBalancingScheme == b.LoadBalancingScheme && + utils.EqualStringSets(a.HealthChecks, b.HealthChecks) && + backendsListEqual(a.Backends, b.Backends) +} diff --git a/pkg/backends/features/features.go b/pkg/backends/features/features.go index e3c5776cc2..471afc39de 100644 --- a/pkg/backends/features/features.go +++ b/pkg/backends/features/features.go @@ -36,6 +36,8 @@ const ( // FeatureL7ILB defines the feature name of L7 Internal Load Balancer // L7-ILB Resources are currently alpha and regional FeatureL7ILB = "L7ILB" + //FeaturePrimaryVMIPNEG defines the feature name of GCE_PRIMARY_VM_IP NEGs which are used for L4 ILB. + FeaturePrimaryVMIPNEG = "PrimaryVMIPNEG" ) var ( @@ -46,7 +48,7 @@ var ( } // TODO: (shance) refactor all scope to be above the serviceport level scopeToFeatures = map[meta.KeyType][]string{ - meta.Regional: []string{FeatureL7ILB}, + meta.Regional: []string{FeatureL7ILB, FeaturePrimaryVMIPNEG}, } ) @@ -68,6 +70,9 @@ func featuresFromServicePort(sp *utils.ServicePort) []string { if sp.NEGEnabled { features = append(features, FeatureNEG) } + if sp.PrimaryIPNEGEnabled { + features = append(features, FeaturePrimaryVMIPNEG) + } if sp.L7ILBEnabled { features = append(features, FeatureL7ILB) } diff --git a/pkg/backends/syncer.go b/pkg/backends/syncer.go index a7b331bc6e..7885eca0e7 100644 --- a/pkg/backends/syncer.go +++ b/pkg/backends/syncer.go @@ -179,6 +179,10 @@ func (s *backendSyncer) GC(svcPorts []utils.ServicePort) error { // gc deletes the provided backends func (s *backendSyncer) gc(backends []*composite.BackendService, knownPorts sets.String) error { for _, be := range backends { + // Skip L4 LB backend services + if strings.Contains(be.Description, utils.L4ILBServiceDescKey) { + continue + } var key *meta.Key name := be.Name scope, err := composite.ScopeFromSelfLink(be.SelfLink) @@ -191,7 +195,6 @@ func (s *backendSyncer) gc(backends []*composite.BackendService, knownPorts sets if knownPorts.Has(key.String()) { continue } - klog.V(2).Infof("GCing backendService for port %s", name) err = s.backendPool.Delete(name, be.Version, scope) if err != nil { diff --git a/pkg/controller/service_controller.go b/pkg/controller/service_controller.go new file mode 100644 index 0000000000..b89e44823b --- /dev/null +++ b/pkg/controller/service_controller.go @@ -0,0 +1,259 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "fmt" + "reflect" + + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/ingress-gce/pkg/annotations" + "k8s.io/ingress-gce/pkg/backends" + "k8s.io/ingress-gce/pkg/context" + "k8s.io/ingress-gce/pkg/controller/translator" + "k8s.io/ingress-gce/pkg/loadbalancers" + negtypes "k8s.io/ingress-gce/pkg/neg/types" + "k8s.io/ingress-gce/pkg/utils" + "k8s.io/ingress-gce/pkg/utils/common" + "k8s.io/klog" + "k8s.io/kubernetes/pkg/apis/core/v1/helper" +) + +// L4Controller manages the create/update delete of all L4 Internal LoadBalancer services. +type L4Controller struct { + ctx *context.ControllerContext + // kubeClient, needed for attaching finalizer + client kubernetes.Interface + svcQueue utils.TaskQueue + serviceLister cache.Indexer + nodeLister listers.NodeLister + stopCh chan struct{} + // mapping of service key to L4Handler that creates all resources. + handlerMap map[string]*loadbalancers.L4 + // needed for listing the zones in the cluster. + translator *translator.Translator + // needed for linking the NEG with the backend service for each ILB service. + NegLinker backends.Linker + backendPool *backends.Backends +} + +// NewL4Controller creates a new instance of the L4 ILB controller. +func NewL4Controller(ctx *context.ControllerContext, stopCh chan struct{}) *L4Controller { + l4c := &L4Controller{ + ctx: ctx, + client: ctx.KubeClient, + serviceLister: ctx.ServiceInformer.GetIndexer(), + nodeLister: listers.NewNodeLister(ctx.NodeInformer.GetIndexer()), + stopCh: stopCh, + } + l4c.translator = translator.NewTranslator(ctx) + l4c.backendPool = backends.NewPool(ctx.Cloud, ctx.ClusterNamer) + l4c.NegLinker = backends.NewNEGLinker(l4c.backendPool, negtypes.NewAdapter(ctx.Cloud), ctx.Cloud) + + l4c.handlerMap = make(map[string]*loadbalancers.L4) + l4c.svcQueue = utils.NewPeriodicTaskQueue("l4", "services", l4c.sync) + ctx.ServiceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + addSvc := obj.(*v1.Service) + svcKey := utils.ServiceKeyFunc(addSvc.Namespace, addSvc.Name) + if common.HasGivenFinalizer(addSvc.ObjectMeta, common.FinalizerKeyV2) { + // This might be an update or delete showing up as Add since the controller probably exited. + l4c.ctx.Recorder(addSvc.Namespace).Eventf(addSvc, v1.EventTypeNormal, "ADD", svcKey) + l4c.svcQueue.Enqueue(addSvc) + return + } + if needsILB, svcType := annotations.WantsL4ILB(addSvc); !needsILB { + klog.V(4).Infof("Ignoring add for non-lb service %s based on %v", svcKey, svcType) + return + } + klog.V(3).Infof("Service %s added, enqueuing", svcKey) + l4c.ctx.Recorder(addSvc.Namespace).Eventf(addSvc, v1.EventTypeNormal, "ADD", svcKey) + l4c.svcQueue.Enqueue(addSvc) + }, + // Deletes will be handled in the Update when the deletion timestamp is set. + UpdateFunc: func(old, cur interface{}) { + curSvc := cur.(*v1.Service) + svcKey := utils.ServiceKeyFunc(curSvc.Namespace, curSvc.Name) + if common.HasGivenFinalizer(curSvc.ObjectMeta, common.FinalizerKeyV2) { + if reflect.DeepEqual(old, cur) { + // TODO when will this be true? + klog.V(3).Infof("Periodic enqueueing of %v", svcKey) + } else { + klog.V(3).Infof("Service %v changed, enqueuing", svcKey) + } + l4c.svcQueue.Enqueue(curSvc) + } + }, + }) + // TODO enhance this by looking at some metric from service controller to ensure it is up. + // We cannot use existence of a backend service or other resource, since those are on a per-service basis. + ctx.AddHealthCheck("service-controller health", func() error { return nil }) + return l4c +} + +func (l4c *L4Controller) Run() { + defer l4c.shutdown() + go l4c.svcQueue.Run() + <-l4c.stopCh +} + +// This should only be called when the process is being terminated. +func (l4c *L4Controller) shutdown() { + klog.Infof("Shutting down L4 Service Controller") + l4c.svcQueue.Shutdown() +} + +func (l4c *L4Controller) getL4Handler(svcKey string, service *v1.Service) *loadbalancers.L4 { + l4, ok := l4c.handlerMap[svcKey] + if !ok { + l4 = loadbalancers.NewL4Handler(service, l4c.ctx.Cloud, meta.Regional, l4c.ctx.ClusterNamer, l4c.ctx.Recorder(service.Namespace)) + l4c.handlerMap[svcKey] = l4 + } + if service != nil { + l4.Service = service + } + return l4 +} + +// processServiceCreateOrUpdate ensures load balancer resources for the given service, as needed. +// Returns an error if processing the service update failed. +func (l4c *L4Controller) processServiceCreateOrUpdate(key string, service *v1.Service) error { + // skip services that are being handled by the legacy service controller. + if utils.IsLegacyL4ILBService(service) { + klog.Warningf("Ignoring update for service %s:%s managed by service controller", + service.Namespace, service.Name) + l4c.ctx.Recorder(service.Namespace).Eventf(service, v1.EventTypeWarning, "SyncLoadBalancerSkipped", + fmt.Sprintf("skipping l4 load balancer sync as service contains '%s' finalizer", common.LegacyILBFinalizer)) + return nil + } + + // Ensure v2 finalizer + if err := common.EnsureServiceFinalizer(service, common.ILBFinalizerV2, l4c.ctx.KubeClient); err != nil { + return fmt.Errorf("Failed to attach finalizer to service %s/%s, err %v", service.Namespace, service.Name, err) + } + l4 := l4c.getL4Handler(key, service) + nodeNames, err := utils.GetReadyNodeNames(l4c.nodeLister) + if err != nil { + return err + } + // Use the same function for both create and updates. If controller crashes and restarts, + // all existing services will show up as Service Adds. + status, err := l4.EnsureInternalLoadBalancer(nodeNames, service) + if err != nil { + l4c.ctx.Recorder(service.Namespace).Eventf(service, v1.EventTypeWarning, "SyncLoadBalancerFailed", + "Error syncing load balancer: %v", err) + return err + } + if status == nil { + return fmt.Errorf("service status returned by EnsureInternalLoadBalancer is nil") + } + if err = l4c.linkNEG(l4); err != nil { + l4c.ctx.Recorder(service.Namespace).Eventf(service, v1.EventTypeWarning, "SyncLoadBalancerFailed", + "Failed to link NEG with Backend Service for load balancer, err: %v", err) + return err + } + err = l4c.updateServiceStatus(service, status) + if err != nil { + l4c.ctx.Recorder(service.Namespace).Eventf(service, v1.EventTypeWarning, "SyncLoadBalancerFailed", + "Error updating load balancer status: %v", err) + return err + } + return nil +} + +func (l4c *L4Controller) processServiceDeletion(key string, svc *v1.Service) error { + l4 := l4c.getL4Handler(key, svc) + l4c.ctx.Recorder(svc.Namespace).Eventf(svc, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer for %s", key) + if err := l4.EnsureInternalLoadBalancerDeleted(svc); err != nil { + l4c.ctx.Recorder(svc.Namespace).Eventf(svc, v1.EventTypeWarning, "DeleteLoadBalancerFailed", "Error deleting load balancer: %v", err) + return err + } + l4c.ctx.Recorder(svc.Namespace).Eventf(svc, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer") + if err := common.EnsureDeleteServiceFinalizer(svc, common.ILBFinalizerV2, l4c.ctx.KubeClient); err != nil { + l4c.ctx.Recorder(svc.Namespace).Eventf(svc, v1.EventTypeWarning, "DeleteLoadBalancerFailed", + "Error removing finalizer from load balancer: %v", err) + return err + } + // Reset the loadbalancer status, Ignore NotFound error since the service can already be deleted at this point. + if err := l4c.updateServiceStatus(svc, &v1.LoadBalancerStatus{}); utils.IgnoreHTTPNotFound(err) != nil { + l4c.ctx.Recorder(svc.Namespace).Eventf(svc, v1.EventTypeWarning, "DeleteLoadBalancer", + "Error reseting load balancer status to empty: %v", err) + return err + } + return nil +} + +// linkNEG associates the NEG to the backendService for the given L4 ILB service. +func (l4c *L4Controller) linkNEG(l4 *loadbalancers.L4) error { + // link neg to backend service + zones, err := l4c.translator.ListZones() + if err != nil { + return nil + } + var groupKeys []backends.GroupKey + for _, zone := range zones { + groupKeys = append(groupKeys, backends.GroupKey{Zone: zone}) + } + return l4c.NegLinker.Link(l4.ServicePort, groupKeys) +} + +func (l4c *L4Controller) sync(key string) error { + svc, exists, err := l4c.ctx.Services().GetByKey(key) + if err != nil { + return fmt.Errorf("Failed to lookup service for key %s : %s", key, err) + } + if !exists || svc == nil { + // The service will not exist if its resources and finalizer are handled by the legacy service controller and + // it has been deleted. As long as the V2 finalizer is present, the service will not be deleted by apiserver. + klog.V(3).Infof("Ignoring delete of service %s not managed by L4 controller", key) + return nil + } + if common.IsDeletionCandidateForGivenFinalizer(svc.ObjectMeta, common.ILBFinalizerV2) { + klog.V(2).Infof("Deleting ILB resources for service %s managed by L4 controller", key) + return l4c.processServiceDeletion(key, svc) + } + needsILB, svcType := annotations.WantsL4ILB(svc) + if !needsILB { + klog.V(2).Infof("Service %s of type %s does not require L4 ILB, cleaning up resources", key, svcType) + return l4c.processServiceDeletion(key, svc) + } + return l4c.processServiceCreateOrUpdate(key, svc) +} + +// TODO move to patch instead of update +func (l4c *L4Controller) updateServiceStatus(svc *v1.Service, newStatus *v1.LoadBalancerStatus) error { + if helper.LoadBalancerStatusEqual(&svc.Status.LoadBalancer, newStatus) { + return nil + } + svcClient := l4c.ctx.KubeClient.CoreV1().Services(svc.Namespace) + // Get the current service since NEG controller would have modified it by adding annotation + current, err := svcClient.Get(svc.Name, metav1.GetOptions{}) + if err != nil { + return err + } + updated := current.DeepCopy() + updated.Status.LoadBalancer = *newStatus + if _, err := svcClient.UpdateStatus(updated); err != nil { + return err + } + return nil +} diff --git a/pkg/firewalls/firewalls_l4.go b/pkg/firewalls/firewalls_l4.go new file mode 100644 index 0000000000..00d436298a --- /dev/null +++ b/pkg/firewalls/firewalls_l4.go @@ -0,0 +1,99 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package firewalls + +import ( + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" + "google.golang.org/api/compute/v1" + "k8s.io/ingress-gce/pkg/utils" + "k8s.io/klog" + "k8s.io/legacy-cloud-providers/gce" + "strings" +) + +func EnsureL4InternalFirewallRule(cloud *gce.Cloud, fwName, lbIP, svcName string, sourceRanges, portRanges, nodeNames []string, proto string) error { + existingFw, err := cloud.GetFirewall(fwName) + if err != nil && !utils.IsNotFoundError(err) { + return err + } + + nodeTags, err := cloud.GetNodeTags(nodeNames) + if err != nil { + return err + } + fwDesc, err := utils.MakeL4ILBServiceDescription(svcName, lbIP, meta.VersionGA) + if err != nil { + klog.Warningf("EnsureL4InternalFirewallRule: Failed to generate description for rule %s, err: %v", + fwName, err) + } + expectedFw := &compute.Firewall{ + Name: fwName, + Description: fwDesc, + Network: cloud.NetworkURL(), + SourceRanges: sourceRanges, + TargetTags: nodeTags, + Allowed: []*compute.FirewallAllowed{ + { + IPProtocol: strings.ToLower(proto), + Ports: portRanges, + }, + }, + } + if existingFw == nil { + klog.V(2).Infof("EnsureL4InternalFirewallRule(%v): creating firewall", fwName) + err = cloud.CreateFirewall(expectedFw) + if utils.IsForbiddenError(err) && cloud.OnXPN() { + gcloudCmd := gce.FirewallToGCloudCreateCmd(expectedFw, cloud.NetworkProjectID()) + + klog.V(3).Infof("EnsureL4InternalFirewallRule(%v): Could not create L4 firewall on XPN cluster: %v. Raising event for cmd: %q", fwName, err, gcloudCmd) + return newFirewallXPNError(err, gcloudCmd) + } + return err + } + if firewallRuleEqual(expectedFw, existingFw) { + return nil + } + klog.V(2).Infof("EnsureL4InternalFirewallRule(%v): updating firewall", fwName) + err = cloud.UpdateFirewall(expectedFw) + if utils.IsForbiddenError(err) && cloud.OnXPN() { + gcloudCmd := gce.FirewallToGCloudUpdateCmd(expectedFw, cloud.NetworkProjectID()) + klog.V(3).Infof("EnsureL4InternalFirewallRule(%v): Could not update L4 firewall on XPN cluster: %v. Raising event for cmd: %q", fwName, err, gcloudCmd) + return newFirewallXPNError(err, gcloudCmd) + } + return err +} + +func EnsureL4InternalFirewallRuleDeleted(cloud *gce.Cloud, fwName string) error { + if err := utils.IgnoreHTTPNotFound(cloud.DeleteFirewall(fwName)); err != nil { + if utils.IsForbiddenError(err) && cloud.OnXPN() { + gcloudCmd := gce.FirewallToGCloudDeleteCmd(fwName, cloud.NetworkProjectID()) + klog.V(2).Infof("EnsureL4InternalFirewallRuleDeleted(%v): could not delete traffic firewall on XPN cluster. Raising event.", fwName) + return newFirewallXPNError(err, gcloudCmd) + } + return err + } + return nil +} + +func firewallRuleEqual(a, b *compute.Firewall) bool { + return a.Description == b.Description && + len(a.Allowed) == 1 && len(a.Allowed) == len(b.Allowed) && + a.Allowed[0].IPProtocol == b.Allowed[0].IPProtocol && + utils.EqualStringSets(a.Allowed[0].Ports, b.Allowed[0].Ports) && + utils.EqualStringSets(a.SourceRanges, b.SourceRanges) && + utils.EqualStringSets(a.TargetTags, b.TargetTags) +} diff --git a/pkg/healthchecks/healthchecks_l4.go b/pkg/healthchecks/healthchecks_l4.go new file mode 100644 index 0000000000..60dfbb59ae --- /dev/null +++ b/pkg/healthchecks/healthchecks_l4.go @@ -0,0 +1,156 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthchecks + +import ( + "fmt" + cloudprovider "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" + "k8s.io/apimachinery/pkg/types" + "k8s.io/ingress-gce/pkg/composite" + "k8s.io/ingress-gce/pkg/utils" + "k8s.io/klog" + "k8s.io/legacy-cloud-providers/gce" +) + +const ( + + // L4 Load Balancer parameters + gceHcCheckIntervalSeconds = int64(8) + gceHcTimeoutSeconds = int64(1) + // Start sending requests as soon as one healthcheck succeeds. + gceHcHealthyThreshold = int64(1) + // Defaults to 3 * 8 = 24 seconds before the LB will steer traffic away. + gceHcUnhealthyThreshold = int64(3) +) + +// EnsureL4HealthCheck creates a new HTTP health check for an L4 LoadBalancer service, based on the parameters provided. +// If the healthcheck already exists, it is updated as needed. +func EnsureL4HealthCheck(cloud *gce.Cloud, name string, svcName types.NamespacedName, shared bool, path string, port int32) (*composite.HealthCheck, string, error) { + selfLink := "" + key, err := composite.CreateKey(cloud, name, meta.Global) + if err != nil { + return nil, selfLink, fmt.Errorf("Failed to create composite key for healthcheck %s - %v", name, err) + } + hc, err := composite.GetHealthCheck(cloud, key, meta.VersionGA) + if err != nil { + if !utils.IsNotFoundError(err) { + return nil, selfLink, err + } + } + expectedHC := NewL4HealthCheck(name, svcName, shared, path, port) + if hc == nil { + // Create the healthcheck + klog.Infof("Creating healthcheck %s for service %s, shared = %v", name, svcName, shared) + err = composite.CreateHealthCheck(cloud, key, expectedHC) + if err != nil { + return nil, selfLink, err + } + selfLink = cloudprovider.SelfLink(meta.VersionGA, cloud.ProjectID(), "healthChecks", key) + return expectedHC, selfLink, nil + } + selfLink = hc.SelfLink + if !needToUpdateHealthChecks(hc, expectedHC) { + // nothing to do + return hc, selfLink, nil + } + mergeHealthChecks(hc, expectedHC) + klog.Infof("Updating healthcheck %s for service %s", name, svcName) + err = composite.UpdateHealthCheck(cloud, key, expectedHC) + if err != nil { + return nil, selfLink, err + } + return expectedHC, selfLink, err +} + +func DeleteHealthCheck(cloud *gce.Cloud, name string) error { + key, err := composite.CreateKey(cloud, name, meta.Global) + if err != nil { + return fmt.Errorf("Failed to create composite key for healthcheck %s - %v", name, err) + } + return composite.DeleteHealthCheck(cloud, key, meta.VersionGA) +} + +func HealthCheckName(shared bool, clusteruid, lbName string) (string, string) { + hcName := lbName + fwName := lbName + if shared { + hcName = clusteruid + "l4-shared-hc" + fwName = clusteruid + "l4-shared-hc-fw" + } + return hcName, fwName +} + +func NewL4HealthCheck(name string, svcName types.NamespacedName, shared bool, path string, port int32) *composite.HealthCheck { + httpSettings := composite.HTTPHealthCheck{ + Port: int64(port), + RequestPath: path, + } + desc := "" + var err error + + if !shared { + desc, err = utils.MakeL4ILBServiceDescription(svcName.String(), "", meta.VersionGA) + if err != nil { + klog.Warningf("Failed to generate description for L4HealthCheck %s, err %v", name, err) + } + } + return &composite.HealthCheck{ + Name: name, + CheckIntervalSec: gceHcCheckIntervalSeconds, + TimeoutSec: gceHcTimeoutSeconds, + HealthyThreshold: gceHcHealthyThreshold, + UnhealthyThreshold: gceHcUnhealthyThreshold, + HttpHealthCheck: &httpSettings, + Type: "HTTP", + Description: desc, + } +} + +// mergeHealthChecks reconciles HealthCheck configures to be no smaller than +// the default values. +// E.g. old health check interval is 2s, new default is 8. +// The HC interval will be reconciled to 8 seconds. +// If the existing health check is larger than the default interval, +// the configuration will be kept. +func mergeHealthChecks(hc, newHC *composite.HealthCheck) { + if hc.CheckIntervalSec > newHC.CheckIntervalSec { + newHC.CheckIntervalSec = hc.CheckIntervalSec + } + if hc.TimeoutSec > newHC.TimeoutSec { + newHC.TimeoutSec = hc.TimeoutSec + } + if hc.UnhealthyThreshold > newHC.UnhealthyThreshold { + newHC.UnhealthyThreshold = hc.UnhealthyThreshold + } + if hc.HealthyThreshold > newHC.HealthyThreshold { + newHC.HealthyThreshold = hc.HealthyThreshold + } +} + +// needToUpdateHealthChecks checks whether the healthcheck needs to be updated. +func needToUpdateHealthChecks(hc, newHC *composite.HealthCheck) bool { + return hc.HttpHealthCheck == nil || + newHC.HttpHealthCheck == nil || + hc.HttpHealthCheck.Port != newHC.HttpHealthCheck.Port || + hc.HttpHealthCheck.RequestPath != newHC.HttpHealthCheck.RequestPath || + hc.Description != newHC.Description || + hc.CheckIntervalSec < newHC.CheckIntervalSec || + hc.TimeoutSec < newHC.TimeoutSec || + hc.UnhealthyThreshold < newHC.UnhealthyThreshold || + hc.HealthyThreshold < newHC.HealthyThreshold +} diff --git a/pkg/loadbalancers/forwarding_rules.go b/pkg/loadbalancers/forwarding_rules.go index 959f62c1bf..e3ea926311 100644 --- a/pkg/loadbalancers/forwarding_rules.go +++ b/pkg/loadbalancers/forwarding_rules.go @@ -19,12 +19,17 @@ package loadbalancers import ( "fmt" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/ingress-gce/pkg/annotations" "k8s.io/ingress-gce/pkg/composite" "k8s.io/ingress-gce/pkg/flags" "k8s.io/ingress-gce/pkg/utils" "k8s.io/ingress-gce/pkg/utils/namer" "k8s.io/klog" + "k8s.io/legacy-cloud-providers/gce" ) const ( @@ -181,3 +186,168 @@ func (l *L7) getEffectiveIP() (string, bool) { } return "", true } + +// getForwardingRule with the given name +// rule with the given name exists with the correct parameters and creates a forwarding rule if +// it does not exist already. If shouldExist parameter is true, this function returns if forwarding rule is not found. +func (l *L4) ensureForwardingRule(loadBalancerName, bsLink string, options gce.ILBOptions) (*composite.ForwardingRule, error) { + key, err := l.CreateKey(loadBalancerName) + if err != nil { + return nil, err + } + desc := utils.L4ILBResourceDescription{} + // version used for creating the existing forwarding rule. + existingVersion := meta.VersionGA + // version to use for the new forwarding rule + newVersion := getAPIVersion(options) + + // Get the GA version forwarding rule, use the description to identify the version it was created with. + existingFwdRule, err := composite.GetForwardingRule(l.cloud, key, meta.VersionGA) + if utils.IgnoreHTTPNotFound(err) != nil { + return nil, err + } + if existingFwdRule != nil { + if err = desc.Unmarshal(existingFwdRule.Description); err != nil { + klog.Warningf("Failed to lookup forwarding rule version from description, err %v. Using GA Version.", err) + } else { + existingVersion = desc.APIVersion + } + } + // Fetch the right forwarding rule in case it is not using GA + if existingVersion != meta.VersionGA { + existingFwdRule, err = composite.GetForwardingRule(l.cloud, key, existingVersion) + if utils.IgnoreHTTPNotFound(err) != nil { + klog.Errorf("Failed to lookup forwarding rule '%s' at version - %s, err %v", key.Name, existingVersion, err) + return nil, err + } + } + + if l.cloud.IsLegacyNetwork() { + l.recorder.Event(l.Service, v1.EventTypeWarning, "ILBOptionsIgnored", "Internal LoadBalancer options are not supported with Legacy Networks.") + options = gce.ILBOptions{} + } + subnetworkURL := l.cloud.SubnetworkURL() + + if !l.cloud.AlphaFeatureGate.Enabled(gce.AlphaFeatureILBCustomSubnet) { + if options.SubnetName != "" { + l.recorder.Event(l.Service, v1.EventTypeWarning, "ILBCustomSubnetOptionIgnored", "Internal LoadBalancer CustomSubnet options ignored as the feature gate is disabled.") + options.SubnetName = "" + } + } + if l.cloud.AlphaFeatureGate.Enabled(gce.AlphaFeatureILBCustomSubnet) { + // If this feature is enabled, changes to subnet annotation will be + // picked up and reflected in the forwarding rule. + // Removing the annotation will set the forwarding rule to use the default subnet. + if options.SubnetName != "" { + subnetKey := *key + subnetKey.Name = options.SubnetName + subnetworkURL = cloud.SelfLink(meta.VersionGA, l.cloud.ProjectID(), "subnetworks", &subnetKey) + } + } else { + // TODO(84885) remove this once ILBCustomSubnet goes beta. + if existingFwdRule != nil && existingFwdRule.Subnetwork != "" { + // If the ILB already exists, continue using the subnet that it's already using. + // This is to support existing ILBs that were setup using the wrong subnet - https://github.com/kubernetes/kubernetes/pull/57861 + subnetworkURL = existingFwdRule.Subnetwork + } + } + // Determine IP which will be used for this LB. If no forwarding rule has been established + // or specified in the Service spec, then requestedIP = "". + ipToUse := ilbIPToUse(l.Service, existingFwdRule, subnetworkURL) + klog.V(2).Infof("ensureForwardingRule(%v): Using subnet %s for LoadBalancer IP %s", loadBalancerName, options.SubnetName, ipToUse) + + var addrMgr *addressManager + // If the network is not a legacy network, use the address manager + if !l.cloud.IsLegacyNetwork() { + nm := types.NamespacedName{Namespace: l.Service.Namespace, Name: l.Service.Name}.String() + addrMgr = newAddressManager(l.cloud, nm, l.cloud.Region(), subnetworkURL, loadBalancerName, ipToUse, cloud.SchemeInternal) + ipToUse, err = addrMgr.HoldAddress() + if err != nil { + return nil, err + } + klog.V(2).Infof("ensureForwardingRule(%v): reserved IP %q for the forwarding rule", loadBalancerName, ipToUse) + } + + ports, _, protocol := utils.GetPortsAndProtocol(l.Service.Spec.Ports) + // add firewall rule before the forwarding rule + + // Create the forwarding rule + frDesc, err := utils.MakeL4ILBServiceDescription(utils.ServiceKeyFunc(l.Service.Namespace, l.Service.Name), ipToUse, + newVersion) + if err != nil { + return nil, fmt.Errorf("Failed to compute description for forwarding rule %s, err: %v", loadBalancerName, + err) + } + + fr := &composite.ForwardingRule{ + Name: loadBalancerName, + IPAddress: ipToUse, + Ports: ports, + IPProtocol: string(protocol), + LoadBalancingScheme: string(cloud.SchemeInternal), + Subnetwork: subnetworkURL, + Network: l.cloud.NetworkURL(), + Version: newVersion, + BackendService: bsLink, + AllowGlobalAccess: options.AllowGlobalAccess, + Description: frDesc, + } + + if existingFwdRule != nil { + if Equal(existingFwdRule, fr) { + // nothing to do + klog.V(2).Infof("ensureForwardingRule: Skipping update of unchanged forwarding rule - %s", fr.Name) + return existingFwdRule, nil + } else { + klog.V(2).Infof("ensureForwardingRule: Recreating forwarding rule - %s", fr.Name) + if err = utils.IgnoreHTTPNotFound(composite.DeleteForwardingRule(l.cloud, key, existingVersion)); err != nil { + return nil, err + } + } + } + if err = composite.CreateForwardingRule(l.cloud, key, fr); err != nil { + return nil, err + } + + return composite.GetForwardingRule(l.cloud, key, fr.Version) +} + +func Equal(fr1, fr2 *composite.ForwardingRule) bool { + // If one of the IP addresses is empty, do not consider it as an inequality. + // If the IP address drops from a valid IP to empty, we do not want to apply + // the change if it is the only change in the forwarding rule. Similarly, if + // the forwarding rule changes from an empty IP to an allocated IP address, the + // subnetwork will change as well. + return (fr1.IPAddress == "" || fr2.IPAddress == "" || fr1.IPAddress == fr2.IPAddress) && + fr1.IPProtocol == fr2.IPProtocol && + fr1.LoadBalancingScheme == fr2.LoadBalancingScheme && + utils.EqualStringSets(fr1.Ports, fr2.Ports) && + fr1.BackendService == fr2.BackendService && + fr1.AllowGlobalAccess == fr2.AllowGlobalAccess && + fr1.Subnetwork == fr2.Subnetwork +} + +// ilbIPToUse determines which IP address needs to be used in the ForwardingRule. If an IP has been +// specified by the user, that is used. If there is an existing ForwardingRule, the ip address from +// that is reused. In case a subnetwork change is requested, the existing ForwardingRule IP is ignored. +func ilbIPToUse(svc *v1.Service, fwdRule *composite.ForwardingRule, requestedSubnet string) string { + if svc.Spec.LoadBalancerIP != "" { + return svc.Spec.LoadBalancerIP + } + if fwdRule == nil { + return "" + } + if requestedSubnet != fwdRule.Subnetwork { + // reset ip address since subnet is being changed. + return "" + } + return fwdRule.IPAddress +} + +// getAPIVersion returns the API version to use for CRUD of Forwarding rules, given the options enabled. +func getAPIVersion(options gce.ILBOptions) meta.Version { + if options.AllowGlobalAccess { + return meta.VersionBeta + } + return meta.VersionGA +} diff --git a/pkg/loadbalancers/l4.go b/pkg/loadbalancers/l4.go new file mode 100644 index 0000000000..ad3cfe6aa5 --- /dev/null +++ b/pkg/loadbalancers/l4.go @@ -0,0 +1,196 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package loadbalancers + +import ( + "strconv" + + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + "k8s.io/cloud-provider/service/helpers" + "k8s.io/ingress-gce/pkg/backends" + "k8s.io/ingress-gce/pkg/composite" + "k8s.io/ingress-gce/pkg/firewalls" + "k8s.io/ingress-gce/pkg/healthchecks" + "k8s.io/ingress-gce/pkg/utils" + "k8s.io/ingress-gce/pkg/utils/namer" + "k8s.io/klog" + "k8s.io/legacy-cloud-providers/gce" +) + +// Many of the functions in this file are re-implemented from gce_loadbalancer_internal.go +// L4 handles the resource creation/deletion/update for a given L4 ILB service. +type L4 struct { + cloud *gce.Cloud + backendPool *backends.Backends + scope meta.KeyType + namer *namer.Namer + // recorder is used to generate k8s Events. + recorder record.EventRecorder + Service *corev1.Service + ServicePort utils.ServicePort + NamespacedName types.NamespacedName +} + +// NewL4Handler creates a new L4Handler for the given L4 service. +func NewL4Handler(service *corev1.Service, cloud *gce.Cloud, scope meta.KeyType, namer *namer.Namer, recorder record.EventRecorder) *L4 { + l := &L4{cloud: cloud, scope: scope, namer: namer, recorder: recorder, Service: service} + l.NamespacedName = types.NamespacedName{Name: service.Name, Namespace: service.Namespace} + l.backendPool = backends.NewPool(l.cloud, l.namer) + l.ServicePort = utils.ServicePort{ID: utils.ServicePortID{Service: l.NamespacedName}, BackendNamer: l.namer, + PrimaryIPNEGEnabled: true} + return l +} + +// CreateKey generates a meta.Key for a given GCE resource name. +func (l *L4) CreateKey(name string) (*meta.Key, error) { + return composite.CreateKey(l.cloud, name, l.scope) +} + +// getILBOptions fetches the optional features requested on the given ILB service. +func getILBOptions(svc *corev1.Service) gce.ILBOptions { + return gce.ILBOptions{AllowGlobalAccess: gce.GetLoadBalancerAnnotationAllowGlobalAccess(svc), + SubnetName: gce.GetLoadBalancerAnnotationSubnet(svc)} + +} + +// EnsureInternalLoadBalancerDeleted performs a cleanup of all GCE resources for the given loadbalancer service. +func (l *L4) EnsureInternalLoadBalancerDeleted(svc *corev1.Service) error { + sharedHC := !helpers.RequestsOnlyLocalTraffic(svc) + name := l.namer.PrimaryIPNEG(svc.Namespace, svc.Name) + key, err := l.CreateKey(name) + if err != nil { + klog.Errorf("Failed to delete loadbalancer resources with name %s for service %s", name, l.NamespacedName.String()) + return err + } + err = utils.IgnoreHTTPNotFound(composite.DeleteForwardingRule(l.cloud, key, getAPIVersion(getILBOptions(l.Service)))) + if err != nil { + klog.Errorf("Failed to delete forwarding rule for internal loadbalancer %s - %v", l.NamespacedName.String(), err) + return err + } + hcName, hcFwName := healthchecks.HealthCheckName(sharedHC, l.namer.UID(), name) + // delete fw rules + deleteFunc := func(name string) error { + err := firewalls.EnsureL4InternalFirewallRuleDeleted(l.cloud, name) + if err != nil { + if fwErr, ok := err.(*firewalls.FirewallXPNError); ok { + l.recorder.Eventf(l.Service, corev1.EventTypeNormal, "XPN", fwErr.Message) + return nil + } + return err + } + return nil + } + err = deleteFunc(name) + if err != nil { + return err + } + + err = deleteFunc(hcFwName) + if err != nil { + return err + } + // delete backend service + err = utils.IgnoreHTTPNotFound(l.backendPool.Delete(name, meta.VersionGA, meta.Regional)) + if err != nil { + klog.Errorf("Failed to delete backends for internal loadbalancer %s - %v", l.NamespacedName.String(), err) + return err + } + + // Delete healthcheck + err = utils.IgnoreHTTPNotFound(healthchecks.DeleteHealthCheck(l.cloud, hcName)) + if err != nil { + // This error will be hit if this is a shared healthcheck. + if utils.IsInUsedByError(err) { + klog.V(2).Infof("Failed to delete healthcheck %s: health check in use.", hcName) + return nil + } + klog.Errorf("Failed to delete healthcheck for internal loadbalancer %s, err %v", l.NamespacedName.String(), err) + return err + } + return nil +} + +// EnsureInternalLoadBalancer ensures that all GCE resources for the given loadbalancer service have +// been created. It returns a LoadBalancerStatus with the updated ForwardingRule IP address. +func (l *L4) EnsureInternalLoadBalancer(nodeNames []string, svc *corev1.Service) (*corev1.LoadBalancerStatus, error) { + // Lookup existing forwarding rule and make sure it exists. + // Use the same resource name for NEG, BackendService as well as FR, FWRule. + l.Service = svc + name := l.namer.PrimaryIPNEG(l.Service.Namespace, l.Service.Name) + options := getILBOptions(l.Service) + + // create healthcheck + sharedHC := !helpers.RequestsOnlyLocalTraffic(l.Service) + hcName, hcFwName := healthchecks.HealthCheckName(sharedHC, l.namer.UID(), name) + hcPath, hcPort := gce.GetNodesHealthCheckPath(), gce.GetNodesHealthCheckPort() + if !sharedHC { + hcPath, hcPort = helpers.GetServiceHealthCheckPathPort(l.Service) + } + _, hcLink, err := healthchecks.EnsureL4HealthCheck(l.cloud, hcName, l.NamespacedName, sharedHC, hcPath, hcPort) + if err != nil { + return nil, err + } + + _, portRanges, protocol := utils.GetPortsAndProtocol(l.Service.Spec.Ports) + + // ensure firewalls + sourceRanges, err := helpers.GetLoadBalancerSourceRanges(l.Service) + if err != nil { + return nil, err + } + hcSourceRanges := gce.L4LoadBalancerSrcRanges() + ensureFunc := func(name, IP string, sourceRanges, portRanges []string, proto string) error { + err := firewalls.EnsureL4InternalFirewallRule(l.cloud, name, IP, l.Service.Name, sourceRanges, portRanges, nodeNames, proto) + if err != nil { + if fwErr, ok := err.(*firewalls.FirewallXPNError); ok { + l.recorder.Eventf(l.Service, corev1.EventTypeNormal, "XPN", fwErr.Message) + return nil + } + return err + } + return nil + } + // Add firewall rule for ILB traffic to nodes + err = ensureFunc(name, "", sourceRanges.StringSlice(), portRanges, string(protocol)) + if err != nil { + return nil, err + } + + // Add firewall rule for healthchecks to nodes + err = ensureFunc(hcFwName, "", hcSourceRanges, []string{strconv.Itoa(int(hcPort))}, string(corev1.ProtocolTCP)) + if err != nil { + return nil, err + } + // ensure backend service + bs, err := l.backendPool.EnsureL4BackendService(name, hcLink, string(protocol), string(l.Service.Spec.SessionAffinity), + string(cloud.SchemeInternal), l.NamespacedName, meta.VersionGA) + if err != nil { + return nil, err + } + + // create fr rule + fr, err := l.ensureForwardingRule(name, bs.SelfLink, options) + if err != nil { + klog.Errorf("EnsureInternalLoadBalancer: Failed to create forwarding rule - %v", err) + return nil, err + } + return &corev1.LoadBalancerStatus{Ingress: []corev1.LoadBalancerIngress{{IP: fr.IPAddress}}}, nil +} diff --git a/pkg/utils/common/finalizer.go b/pkg/utils/common/finalizer.go index 4c23506baf..de07803295 100644 --- a/pkg/utils/common/finalizer.go +++ b/pkg/utils/common/finalizer.go @@ -16,8 +16,10 @@ package common import ( "fmt" + v1 "k8s.io/api/core/v1" "k8s.io/api/networking/v1beta1" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" client "k8s.io/client-go/kubernetes/typed/networking/v1beta1" "k8s.io/klog" "k8s.io/kubernetes/pkg/util/slice" @@ -29,11 +31,11 @@ const ( // FinalizerKeyV2 is the string representing the Ingress finalizer version. // Ingress with V2 finalizer uses V2 frontend naming scheme. FinalizerKeyV2 = "networking.gke.io/ingress-finalizer-V2" - // FinalizerKeyL4 is the string representing the L4 ILB controller finalizer in this repo. - FinalizerKeyL4 = "networking.gke.io/l4-ilb-v2" - // FinalizerKeyL4V1 is the string representing the service controller finalizer. A service with this finalizer - // is managed by k/k service controller. - FinalizerKeyL4V1 = "networking.gke.io/l4-ilb-v1" + // TODO remove the 2 definitions once they are added in legacy-cloud-providers/gce + // LegacyILBFinalizer key is used to identify ILB services whose resources are managed by service controller. + LegacyILBFinalizer = "gke.networking.io/l4-ilb-v1" + // ILBFinalizerV2 is the finalizer used by newer controllers that implement Internal LoadBalancer services. + ILBFinalizerV2 = "gke.networking.io/l4-ilb-v2" ) // IsDeletionCandidate is true if the passed in meta contains an ingress finalizer. @@ -88,3 +90,33 @@ func EnsureDeleteFinalizer(ing *v1beta1.Ingress, ingClient client.IngressInterfa } return nil } + +// EnsureServiceFinalizer patches the service to add finalizer. +func EnsureServiceFinalizer(service *v1.Service, key string, kubeClient kubernetes.Interface) error { + if HasGivenFinalizer(service.ObjectMeta, key) { + return nil + } + + // Make a copy so we don't mutate the shared informer cache. + updated := service.DeepCopy() + updated.ObjectMeta.Finalizers = append(updated.ObjectMeta.Finalizers, key) + + klog.V(2).Infof("Adding finalizer %s to service %s/%s", key, updated.Namespace, updated.Name) + _, err := kubeClient.CoreV1().Services(updated.Namespace).Update(updated) + return err +} + +// removeFinalizer patches the service to remove finalizer. +func EnsureDeleteServiceFinalizer(service *v1.Service, key string, kubeClient kubernetes.Interface) error { + if !HasGivenFinalizer(service.ObjectMeta, key) { + return nil + } + + // Make a copy so we don't mutate the shared informer cache. + updated := service.DeepCopy() + updated.ObjectMeta.Finalizers = slice.RemoveString(updated.ObjectMeta.Finalizers, key, nil) + + klog.V(2).Infof("Removing finalizer from service %s/%s", updated.Namespace, updated.Name) + _, err := kubeClient.CoreV1().Services(updated.Namespace).Update(updated) + return err +} diff --git a/pkg/utils/namer/interfaces.go b/pkg/utils/namer/interfaces.go index 41160b6d09..8cd431714c 100644 --- a/pkg/utils/namer/interfaces.go +++ b/pkg/utils/namer/interfaces.go @@ -55,6 +55,8 @@ type BackendNamer interface { // NEG returns the gce neg name based on the service namespace, name // and target port. NEG(namespace, name string, Port int32) string + // PrimaryIPNEG returns the gce neg name based on the service namespace and name + PrimaryIPNEG(namespace, name string) string // InstanceGroup constructs the name for an Instance Group. InstanceGroup() string // NamedPort returns the name for a named port. diff --git a/pkg/utils/namer/namer.go b/pkg/utils/namer/namer.go index baa6221676..78be3a6e98 100644 --- a/pkg/utils/namer/namer.go +++ b/pkg/utils/namer/namer.go @@ -24,6 +24,8 @@ import ( "strings" "sync" + "k8s.io/ingress-gce/pkg/utils/common" + "k8s.io/klog" ) @@ -479,7 +481,8 @@ func (n *Namer) PrimaryIPNEG(namespace, name string) string { truncNamespace := truncFields[0] truncName := truncFields[1] // Use the full cluster UID in the suffix to reduce chance of collision. - return fmt.Sprintf("%s-%s-%s-%s", n.negPrefix(), truncNamespace, truncName, negSuffix(n.UID(), namespace, name, "", "")) + return fmt.Sprintf("%s-%s-%s-%s", n.negPrefix(), truncNamespace, truncName, + primaryIPNegSuffix(n.UID(), namespace, name)) } // IsNEG returns true if the name is a NEG owned by this cluster. @@ -494,6 +497,11 @@ func (n *Namer) negPrefix() string { return fmt.Sprintf("%s%s-%s", n.prefix, schemaVersionV1, n.shortUID()) } +// primaryIPNegSuffix returns an 8 character hash code to be used as suffix in GCE_PRIMARY_VM_IP NEG. +func primaryIPNegSuffix(uid, namespace, name string) string { + return common.ContentHash(strings.Join([]string{uid, namespace, name}, ";"), 8) +} + // negSuffix returns hash code with 8 characters func negSuffix(uid, namespace, name, port, subset string) string { negString := strings.Join([]string{uid, namespace, name, port}, ";") diff --git a/pkg/utils/serviceport.go b/pkg/utils/serviceport.go index 842538dd15..eb7bda09ca 100644 --- a/pkg/utils/serviceport.go +++ b/pkg/utils/serviceport.go @@ -46,20 +46,21 @@ type ServicePort struct { NodePort int64 // Numerical port of the Service, retrieved from the Service - Port int32 - Protocol annotations.AppProtocol - TargetPort string - NEGEnabled bool - L7ILBEnabled bool - BackendConfig *backendconfigv1beta1.BackendConfig - BackendNamer namer.BackendNamer + Port int32 + Protocol annotations.AppProtocol + TargetPort string + NEGEnabled bool + PrimaryIPNEGEnabled bool + L7ILBEnabled bool + BackendConfig *backendconfigv1beta1.BackendConfig + BackendNamer namer.BackendNamer } // GetAPIVersionFromServicePort returns the compute API version to be used // for creating NEGs associated with the given ServicePort. func GetAPIVersionFromServicePort(sp *ServicePort) meta.Version { - if sp == nil { - // this uses GCE_VM_PRIMARY_IP NEGS which requires alpha API + if sp.PrimaryIPNEGEnabled { + // this uses VM_PRIMARY_IP_NEGS which requires alpha API return meta.VersionAlpha } return meta.VersionGA @@ -77,6 +78,8 @@ func (sp ServicePort) GetDescription() Description { func (sp ServicePort) BackendName() string { if sp.NEGEnabled { return sp.BackendNamer.NEG(sp.ID.Service.Namespace, sp.ID.Service.Name, sp.Port) + } else if sp.PrimaryIPNEGEnabled { + return sp.BackendNamer.PrimaryIPNEG(sp.ID.Service.Namespace, sp.ID.Service.Name) } return sp.BackendNamer.IGBackend(sp.NodePort) } diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 62f2bf28ac..9bf22d9203 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -22,14 +22,20 @@ import ( "errors" "fmt" "net/http" + "sort" + "strconv" "strings" + "k8s.io/kubernetes/pkg/util/slice" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" "google.golang.org/api/compute/v1" "google.golang.org/api/googleapi" api_v1 "k8s.io/api/core/v1" "k8s.io/api/networking/v1beta1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/ingress-gce/pkg/annotations" @@ -70,7 +76,8 @@ const ( LabelNodeRoleExcludeBalancer = "alpha.service-controller.kubernetes.io/exclude-balancer" // ToBeDeletedTaint is the taint that the autoscaler adds when a node is scheduled to be deleted // https://github.com/kubernetes/autoscaler/blob/cluster-autoscaler-0.5.2/cluster-autoscaler/utils/deletetaint/delete.go#L33 - ToBeDeletedTaint = "ToBeDeletedByClusterAutoscaler" + ToBeDeletedTaint = "ToBeDeletedByClusterAutoscaler" + L4ILBServiceDescKey = "networking.gke.io/service-name" ) // FrontendGCAlgorithm species GC algorithm used for ingress frontend resources. @@ -85,6 +92,10 @@ const ( // CleanupV2FrontendResources specifies that frontend resources for ingresses // that use v2 naming scheme need to be deleted. CleanupV2FrontendResources + // AffinityTypeNone - no session affinity. + gceAffinityTypeNone = "NONE" + // AffinityTypeClientIP - affinity based on Client IP. + gceAffinityTypeClientIP = "CLIENT_IP" ) // FakeGoogleAPIForbiddenErr creates a Forbidden error with type googleapi.Error @@ -429,13 +440,118 @@ func NumEndpoints(ep *api_v1.Endpoints) (result int) { return result } +// EqualStringSets returns true if 2 given string slices contain the same elements, in any order. +func EqualStringSets(x, y []string) bool { + if len(x) != len(y) { + return false + } + xString := sets.NewString(x...) + yString := sets.NewString(y...) + return xString.Equal(yString) +} + +// GetPortRanges returns a list of port ranges, given a list of ports. +func GetPortRanges(ports []int) (ranges []string) { + if len(ports) < 1 { + return ranges + } + sort.Ints(ports) + + start := ports[0] + prev := ports[0] + for ix, current := range ports { + switch { + case current == prev: + // Loop over duplicates, except if the end of list is reached. + if ix == len(ports)-1 { + if start == current { + ranges = append(ranges, fmt.Sprintf("%d", current)) + } else { + ranges = append(ranges, fmt.Sprintf("%d-%d", start, current)) + } + } + case current == prev+1: + // continue the streak, create the range if this is the last element in the list. + if ix == len(ports)-1 { + ranges = append(ranges, fmt.Sprintf("%d-%d", start, current)) + } + default: + // current is not prev + 1, streak is broken. Construct the range and handle last element case. + if start == prev { + ranges = append(ranges, fmt.Sprintf("%d", prev)) + } else { + ranges = append(ranges, fmt.Sprintf("%d-%d", start, prev)) + } + if ix == len(ports)-1 { + ranges = append(ranges, fmt.Sprintf("%d", current)) + } + // reset start element + start = current + } + prev = current + } + return ranges +} + +// GetPortsAndProtocol returns the list of ports, list of port ranges and the protocol given the list of k8s port info. +func GetPortsAndProtocol(svcPorts []api_v1.ServicePort) (ports []string, portRanges []string, protocol api_v1.Protocol) { + if len(svcPorts) == 0 { + return []string{}, []string{}, api_v1.ProtocolUDP + } + + // GCP doesn't support multiple protocols for a single load balancer + protocol = svcPorts[0].Protocol + portInts := []int{} + for _, p := range svcPorts { + ports = append(ports, strconv.Itoa(int(p.Port))) + portInts = append(portInts, int(p.Port)) + } + + return ports, GetPortRanges(portInts), protocol +} + +// TranslateAffinityType converts the k8s affinity type to the GCE affinity type. +func TranslateAffinityType(affinityType string) string { + switch affinityType { + case string(api_v1.ServiceAffinityClientIP): + return gceAffinityTypeClientIP + case string(api_v1.ServiceAffinityNone): + return gceAffinityTypeNone + default: + klog.Errorf("Unexpected affinity type: %v", affinityType) + return gceAffinityTypeNone + } +} + // IsLegacyL4ILBService returns true if the given LoadBalancer service is managed by service controller. func IsLegacyL4ILBService(svc *api_v1.Service) bool { - for _, key := range svc.ObjectMeta.Finalizers { - if key == common.FinalizerKeyL4V1 { - // service has v1 finalizer, this is handled by service controller code. - return true - } + return slice.ContainsString(svc.ObjectMeta.Finalizers, common.LegacyILBFinalizer, nil) +} + +// L4ILBResourceDescription stores the description fields for L4 ILB resources. +// This is useful to indetify which resources correspond to which L4 ILB service. +type L4ILBResourceDescription struct { + // ServiceName indicates the name of the service the resource is for. + ServiceName string `json:"networking.gke.io/service-name"` + // APIVersion stores the version og the compute API used to create this resource. + APIVersion meta.Version `json:"networking.gke.io/api-version,omitempty"` + ServiceIP string `json:"networking.gke.io/service-ip,omitempty"` +} + +// Marshal returns the description as a JSON-encoded string. +func (d *L4ILBResourceDescription) Marshal() (string, error) { + out, err := json.Marshal(d) + if err != nil { + return "", err } - return false + return string(out), err +} + +// Unmarshal converts the JSON-encoded description string into the struct. +func (d *L4ILBResourceDescription) Unmarshal(desc string) error { + return json.Unmarshal([]byte(desc), d) +} + +func MakeL4ILBServiceDescription(svcName, ip string, version meta.Version) (string, error) { + return (&L4ILBResourceDescription{ServiceName: svcName, ServiceIP: ip, APIVersion: version}).Marshal() } diff --git a/pkg/utils/utils_test.go b/pkg/utils/utils_test.go index 6541e6fca7..583e4af086 100644 --- a/pkg/utils/utils_test.go +++ b/pkg/utils/utils_test.go @@ -21,7 +21,7 @@ import ( "testing" "time" - "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/ingress-gce/pkg/annotations" "k8s.io/ingress-gce/pkg/flags" "k8s.io/ingress-gce/pkg/utils/common" @@ -762,7 +762,7 @@ func TestIsLegacyL4ILBService(t *testing.T) { Name: "testsvc", Namespace: "default", Annotations: map[string]string{gce.ServiceAnnotationLoadBalancerType: string(gce.LBTypeInternal)}, - Finalizers: []string{common.FinalizerKeyL4V1}, + Finalizers: []string{common.LegacyILBFinalizer}, }, Spec: api_v1.ServiceSpec{ Type: api_v1.ServiceTypeLoadBalancer,