Skip to content

Commit

Permalink
Rewrite L4 healthchecks creation and deletion
Browse files Browse the repository at this point in the history
- create a common singleton like struct fot l4 health checks
- new struct holds mutex for common resources
- delete shared healtcheck firewall rules safely
  • Loading branch information
cezarygerard committed May 5, 2022
1 parent 2781c36 commit a421fe8
Show file tree
Hide file tree
Showing 14 changed files with 427 additions and 214 deletions.
3 changes: 3 additions & 0 deletions cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package main
import (
"context"
"fmt"
"k8s.io/ingress-gce/pkg/healthchecks"
"math/rand"
"os"
"time"
Expand Down Expand Up @@ -274,6 +275,8 @@ func runControllers(ctx *ingctx.ControllerContext) {

fwc := firewalls.NewFirewallController(ctx, flags.F.NodePortRanges.Values())

healthchecks.Initialize(ctx.Cloud, ctx)

if flags.F.RunL4Controller {
l4Controller := l4lb.NewILBController(ctx, stopCh)
go l4Controller.Run()
Expand Down
2 changes: 2 additions & 0 deletions hack/run-local-glbc.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,7 @@ ${GLBC} \
--running-in-cluster=false \
--logtostderr --v=${V} \
--config-file-path=${GCECONF} \
--run-l4-controller \
--run-l4-netlb-controller \
"${@}" \
2>&1 | tee -a /tmp/glbc.log
6 changes: 3 additions & 3 deletions pkg/firewalls/firewalls_l4.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func EnsureL4FirewallRule(cloud *gce.Cloud, nsName string, params *FirewallParam
if err != nil {
return err
}
fwDesc, err := utils.MakeL4LBServiceDescription(nsName, params.IP, meta.VersionGA, sharedRule, params.L4Type)
fwDesc, err := utils.MakeL4LBFirewallDescription(nsName, params.IP, meta.VersionGA, sharedRule)
if err != nil {
klog.Warningf("EnsureL4FirewallRule(%v): failed to generate description for L4 %s rule, err: %v", params.Name, params.L4Type.ToString(), err)
}
Expand Down Expand Up @@ -104,8 +104,8 @@ func EnsureL4FirewallRuleDeleted(cloud *gce.Cloud, fwName string) error {
}

func firewallRuleEqual(a, b *compute.Firewall) bool {
return a.Description == b.Description &&
len(a.Allowed) == 1 && len(a.Allowed) == len(b.Allowed) &&
// let's skip description not to trigger flood of updates
return len(a.Allowed) == 1 && len(a.Allowed) == len(b.Allowed) &&
a.Allowed[0].IPProtocol == b.Allowed[0].IPProtocol &&
utils.EqualStringSets(a.Allowed[0].Ports, b.Allowed[0].Ports) &&
utils.EqualStringSets(a.SourceRanges, b.SourceRanges) &&
Expand Down
162 changes: 147 additions & 15 deletions pkg/healthchecks/healthchecks_l4.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ package healthchecks

import (
"fmt"
"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/events"
"k8s.io/ingress-gce/pkg/firewalls"
"strconv"
"sync"

cloudprovider "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
Expand All @@ -42,44 +47,103 @@ const (
gceHcUnhealthyThreshold = int64(3)
)

var instance *l4HealthChecks

type l4HealthChecks struct {
mutex sync.Mutex
cloud *gce.Cloud
recorderFactory events.RecorderProducer
}

func Initialize(cloud *gce.Cloud, recorderFactory events.RecorderProducer) {
instance = &l4HealthChecks{
cloud: cloud,
recorderFactory: recorderFactory,
}
}
func Fake(cloud *gce.Cloud, recorderFactory events.RecorderProducer) *l4HealthChecks {
return &l4HealthChecks{
cloud: cloud,
recorderFactory: recorderFactory,
}
}

func GetL4() *l4HealthChecks {
return instance
}

// EnsureL4HealthCheck creates a new HTTP health check for an L4 LoadBalancer service, based on the parameters provided.
// If the healthcheck already exists, it is updated as needed.
func EnsureL4HealthCheck(cloud *gce.Cloud, svc *corev1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType) (string, string, int32, string, error) {
func (l4hc *l4HealthChecks) EnsureL4HealthCheck(svc *corev1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType, nodeNames []string) (string, string, string, string, error) {
// mutex?
hcName, hcFwName := namer.L4HealthCheck(svc.Namespace, svc.Name, sharedHC)
hcPath, hcPort := gce.GetNodesHealthCheckPath(), gce.GetNodesHealthCheckPort()
if !sharedHC {
hcPath, hcPort = helpers.GetServiceHealthCheckPathPort(svc)
l4hc.mutex.Lock()
defer l4hc.mutex.Unlock()

}
namespacedName := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}
_, hcLink, err := l4hc.ensureL4HealthCheckInternal(hcName, namespacedName, sharedHC, hcPath, hcPort, scope, l4Type)
if err != nil {
return "", "", "", annotations.HealthcheckResource, err
}
err = l4hc.ensureFirewall(svc, hcFwName, hcPort, sharedHC, nodeNames)
if err != nil {
return "", "", "", annotations.FirewallForHealthcheckResource, err
}

return hcLink, hcFwName, hcName, "", err
}

func (l4hc *l4HealthChecks) DeleteHealthCheck(svc *corev1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType) (string, error) {
hcName, hcFwName := namer.L4HealthCheck(svc.Namespace, svc.Name, sharedHC)
if sharedHC {
l4hc.mutex.Lock()
defer l4hc.mutex.Unlock()
}
NamespacedName := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}
err := utils.IgnoreHTTPNotFound(l4hc.deleteHealthCheck(hcName, scope))
if err != nil {
if !utils.IsInUsedByError(err) {
klog.Errorf("Failed to delete healthcheck for service %s - %v", NamespacedName.String(), err)
return annotations.HealthcheckResource, err
}
// Ignore deletion error due to health check in use by another resource.
// This will be hit if this is a shared healthcheck.
klog.V(2).Infof("Failed to delete healthcheck %s: health check in use.", hcName)
return "", nil
}
nn := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}
_, hcLink, err := ensureL4HealthCheckInternal(cloud, hcName, nn, sharedHC, hcPath, hcPort, scope, l4Type)
return hcLink, hcFwName, hcPort, hcName, err
// Health check deleted, now delete the firewall rule
return l4hc.deleteHealthCheckFirewall(svc, hcName, hcFwName, sharedHC, l4Type)
}

func ensureL4HealthCheckInternal(cloud *gce.Cloud, name string, svcName types.NamespacedName, shared bool, path string, port int32, scope meta.KeyType, l4Type utils.L4LBType) (*composite.HealthCheck, string, error) {
func (l4hc *l4HealthChecks) ensureL4HealthCheckInternal(name string, svcName types.NamespacedName, shared bool, path string, port int32, scope meta.KeyType, l4Type utils.L4LBType) (*composite.HealthCheck, string, error) {
selfLink := ""
key, err := composite.CreateKey(cloud, name, scope)
key, err := composite.CreateKey(l4hc.cloud, name, scope)
if err != nil {
return nil, selfLink, fmt.Errorf("Failed to create composite key for healthcheck %s - %w", name, err)
return nil, selfLink, fmt.Errorf("Failed to create key for for healthcheck with name %s for service %s", name, svcName.String())
}
hc, err := composite.GetHealthCheck(cloud, key, meta.VersionGA)
hc, err := composite.GetHealthCheck(l4hc.cloud, key, meta.VersionGA)
if err != nil {
if !utils.IsNotFoundError(err) {
return nil, selfLink, err
}
}
var region string
if scope == meta.Regional {
region = cloud.Region()
region = l4hc.cloud.Region()
}
expectedHC := NewL4HealthCheck(name, svcName, shared, path, port, l4Type, scope, region)
if hc == nil {
// Create the healthcheck
klog.V(2).Infof("Creating healthcheck %s for service %s, shared = %v", name, svcName, shared)
err = composite.CreateHealthCheck(cloud, key, expectedHC)
err = composite.CreateHealthCheck(l4hc.cloud, key, expectedHC)
if err != nil {
return nil, selfLink, err
}
selfLink = cloudprovider.SelfLink(meta.VersionGA, cloud.ProjectID(), "healthChecks", key)
selfLink = cloudprovider.SelfLink(meta.VersionGA, l4hc.cloud.ProjectID(), "healthChecks", key)
return expectedHC, selfLink, nil
}
selfLink = hc.SelfLink
Expand All @@ -89,19 +153,87 @@ func ensureL4HealthCheckInternal(cloud *gce.Cloud, name string, svcName types.Na
}
mergeHealthChecks(hc, expectedHC)
klog.V(2).Infof("Updating healthcheck %s for service %s", name, svcName)
err = composite.UpdateHealthCheck(cloud, key, expectedHC)
err = composite.UpdateHealthCheck(l4hc.cloud, key, expectedHC)
if err != nil {
return nil, selfLink, err
}
return expectedHC, selfLink, err
}

func DeleteHealthCheck(cloud *gce.Cloud, name string, scope meta.KeyType) error {
key, err := composite.CreateKey(cloud, name, scope)
func (l4hc *l4HealthChecks) ensureFirewall(svc *corev1.Service, hcFwName string, hcPort int32, sharedHC bool, nodeNames []string) error {
// Add firewall rule for healthchecks to nodes
hcFWRParams := firewalls.FirewallParams{
PortRanges: []string{strconv.Itoa(int(hcPort))},
SourceRanges: gce.L4LoadBalancerSrcRanges(),
Protocol: string(corev1.ProtocolTCP),
Name: hcFwName,
NodeNames: nodeNames,
}
err := firewalls.EnsureL4LBFirewallForHc(svc, sharedHC, &hcFWRParams, l4hc.cloud, &l4hc.mutex, l4hc.recorderFactory.Recorder(svc.Namespace))
if err != nil {
return err
}
return nil
}

func (l4hc *l4HealthChecks) deleteHealthCheck(name string, scope meta.KeyType) error {
// always check mutex
key, err := composite.CreateKey(l4hc.cloud, name, scope)
if err != nil {
return fmt.Errorf("Failed to create composite key for healthcheck %s - %w", name, err)
}
return composite.DeleteHealthCheck(cloud, key, meta.VersionGA)
return composite.DeleteHealthCheck(l4hc.cloud, key, meta.VersionGA)
}

func (l4hc *l4HealthChecks) deleteHealthCheckFirewall(svc *corev1.Service, hcName, hcFwName string, sharedHC bool, l4Type utils.L4LBType) (string, error) {
namespacedName := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}

safeToDelete, err := l4hc.healthCheckFirewallSafeToDelete(hcName, sharedHC, l4Type)
if err != nil {
klog.Errorf("Failed to delete health check firewall rule %s for service %s - %v", hcFwName, namespacedName.String(), err)
return annotations.HealthcheckResource, err
}
if !safeToDelete {
klog.V(2).Infof("Failed to delete health check firewall rule %s: health check in use.", hcName)
return "", nil
}
// Delete healthcheck firewall rule if no healthcheck uses the firewall rule
err = l4hc.deleteFirewall(hcFwName, svc)
if err != nil {
klog.Errorf("Failed to delete firewall rule %s for internal loadbalancer service %s, err %v", hcFwName, namespacedName.String(), err)
return annotations.FirewallForHealthcheckResource, err
}
return "", nil
}

func (l4hc *l4HealthChecks) healthCheckFirewallSafeToDelete(hcName string, sharedHC bool, l4Type utils.L4LBType) (bool, error) {
if !sharedHC {
return true, nil
}
var scopeToCheck meta.KeyType
scopeToCheck = meta.Regional
if l4Type == utils.XLB {
scopeToCheck = meta.Global
}
key, err := composite.CreateKey(l4hc.cloud, hcName, scopeToCheck)
if err != nil {
return false, fmt.Errorf("Failed to create composite key for healthcheck %s - %w", hcName, err)
}
_, err = composite.GetHealthCheck(l4hc.cloud, key, meta.VersionGA)
return utils.IsNotFoundError(err), nil
}

func (l4hc *l4HealthChecks) deleteFirewall(name string, svc *corev1.Service) error {
err := firewalls.EnsureL4FirewallRuleDeleted(l4hc.cloud, name)
if err != nil {
if fwErr, ok := err.(*firewalls.FirewallXPNError); ok {
recorder := l4hc.recorderFactory.Recorder(svc.Namespace)
recorder.Eventf(svc, corev1.EventTypeNormal, "XPN", fwErr.Message)
return nil
}
return err
}
return nil
}

func NewL4HealthCheck(name string, svcName types.NamespacedName, shared bool, path string, port int32, l4Type utils.L4LBType, scope meta.KeyType, region string) *composite.HealthCheck {
Expand Down
4 changes: 2 additions & 2 deletions pkg/l4lb/l4controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (l4c *L4Controller) shouldProcessService(service *v1.Service, l4 *loadbalan
// processServiceCreateOrUpdate ensures load balancer resources for the given service, as needed.
// Returns an error if processing the service update failed.
func (l4c *L4Controller) processServiceCreateOrUpdate(key string, service *v1.Service) *loadbalancers.L4ILBSyncResult {
l4 := loadbalancers.NewL4Handler(service, l4c.ctx.Cloud, meta.Regional, l4c.namer, l4c.ctx.Recorder(service.Namespace), &l4c.sharedResourcesLock)
l4 := loadbalancers.NewL4Handler(service, l4c.ctx.Cloud, meta.Regional, l4c.namer, l4c.ctx.Recorder(service.Namespace))
if !l4c.shouldProcessService(service, l4) {
return nil
}
Expand Down Expand Up @@ -241,7 +241,7 @@ func (l4c *L4Controller) processServiceCreateOrUpdate(key string, service *v1.Se
}

func (l4c *L4Controller) processServiceDeletion(key string, svc *v1.Service) *loadbalancers.L4ILBSyncResult {
l4 := loadbalancers.NewL4Handler(svc, l4c.ctx.Cloud, meta.Regional, l4c.namer, l4c.ctx.Recorder(svc.Namespace), &l4c.sharedResourcesLock)
l4 := loadbalancers.NewL4Handler(svc, l4c.ctx.Cloud, meta.Regional, l4c.namer, l4c.ctx.Recorder(svc.Namespace))
l4c.ctx.Recorder(svc.Namespace).Eventf(svc, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer for %s", key)
result := l4.EnsureInternalLoadBalancerDeleted(svc)
if result.Error != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/l4lb/l4controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package l4lb
import (
context2 "context"
"fmt"
"k8s.io/ingress-gce/pkg/healthchecks"
"testing"
"time"

Expand Down Expand Up @@ -70,6 +71,7 @@ func newServiceController(t *testing.T, fakeGCE *gce.Cloud) *L4Controller {
for _, n := range nodes {
ctx.NodeInformer.GetIndexer().Add(n)
}
healthchecks.Initialize(ctx.Cloud, ctx)
return NewILBController(ctx, stopCh)
}

Expand Down
13 changes: 5 additions & 8 deletions pkg/l4lb/l4netlbcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ package l4lb

import (
"fmt"
"reflect"
"sync"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
v1 "k8s.io/api/core/v1"
Expand All @@ -38,6 +35,7 @@ import (
"k8s.io/ingress-gce/pkg/utils/common"
"k8s.io/ingress-gce/pkg/utils/namer"
"k8s.io/klog"
"reflect"
)

const l4NetLBControllerName = "l4netlb-controller"
Expand All @@ -54,8 +52,7 @@ type L4NetLBController struct {
// enqueueTracker tracks the latest time an update was enqueued
enqueueTracker utils.TimeTracker
// syncTracker tracks the latest time an enqueued service was synced
syncTracker utils.TimeTracker
sharedResourcesLock sync.Mutex
syncTracker utils.TimeTracker

backendPool *backends.Backends
instancePool instances.NodePool
Expand Down Expand Up @@ -236,7 +233,7 @@ func (lc *L4NetLBController) hasForwardingRuleAnnotation(svc *v1.Service, frName

// hasRBSForwardingRule checks if services loadbalancer has forwarding rule pointing to backend service
func (lc *L4NetLBController) hasRBSForwardingRule(svc *v1.Service) bool {
l4netlb := loadbalancers.NewL4NetLB(svc, lc.ctx.Cloud, meta.Regional, lc.namer, lc.ctx.Recorder(svc.Namespace), &lc.sharedResourcesLock)
l4netlb := loadbalancers.NewL4NetLB(svc, lc.ctx.Cloud, meta.Regional, lc.namer, lc.ctx.Recorder(svc.Namespace))
frName := l4netlb.GetFRName()
// to optimize number of api calls, at first, check if forwarding rule exists in annotation
if lc.hasForwardingRuleAnnotation(svc, frName) {
Expand Down Expand Up @@ -323,7 +320,7 @@ func (lc *L4NetLBController) sync(key string) error {
// syncInternal ensures load balancer resources for the given service, as needed.
// Returns an error if processing the service update failed.
func (lc *L4NetLBController) syncInternal(service *v1.Service) *loadbalancers.L4NetLBSyncResult {
l4netlb := loadbalancers.NewL4NetLB(service, lc.ctx.Cloud, meta.Regional, lc.namer, lc.ctx.Recorder(service.Namespace), &lc.sharedResourcesLock)
l4netlb := loadbalancers.NewL4NetLB(service, lc.ctx.Cloud, meta.Regional, lc.namer, lc.ctx.Recorder(service.Namespace))
// check again that rbs is enabled.
if !lc.isRBSBasedService(service) {
klog.Infof("Skipping syncInternal. Service %s does not have RBS enabled", service.Name)
Expand Down Expand Up @@ -402,7 +399,7 @@ func (lc *L4NetLBController) ensureInstanceGroups(service *v1.Service, nodeNames

// garbageCollectRBSNetLB cleans-up all gce resources related to service and removes NetLB finalizer
func (lc *L4NetLBController) garbageCollectRBSNetLB(key string, svc *v1.Service) *loadbalancers.L4NetLBSyncResult {
l4netLB := loadbalancers.NewL4NetLB(svc, lc.ctx.Cloud, meta.Regional, lc.namer, lc.ctx.Recorder(svc.Namespace), &lc.sharedResourcesLock)
l4netLB := loadbalancers.NewL4NetLB(svc, lc.ctx.Cloud, meta.Regional, lc.namer, lc.ctx.Recorder(svc.Namespace))
lc.ctx.Recorder(svc.Namespace).Eventf(svc, v1.EventTypeNormal, "DeletingLoadBalancer",
"Deleting L4 External LoadBalancer for %s", key)

Expand Down
7 changes: 3 additions & 4 deletions pkg/l4lb/l4netlbcontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"reflect"
"sort"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -240,7 +239,7 @@ func newL4NetLBServiceController() *L4NetLBController {
for _, n := range nodes {
ctx.NodeInformer.GetIndexer().Add(n)
}

healthchecks.Initialize(ctx.Cloud, ctx)
return NewL4NetLBController(ctx, stopCh)
}

Expand Down Expand Up @@ -836,7 +835,7 @@ func TestHealthCheckWhenExternalTrafficPolicyWasUpdated(t *testing.T) {
// delete shared health check if is created, update service to Cluster and
// check that non-shared health check was created
hcNameShared, _ := lc.namer.L4HealthCheck(svc.Namespace, svc.Name, true)
healthchecks.DeleteHealthCheck(lc.ctx.Cloud, hcNameShared, meta.Regional)
healthchecks.Fake(lc.ctx.Cloud, lc.ctx).DeleteHealthCheck(svc, lc.namer, true, meta.Regional, utils.XLB)
// Update ExternalTrafficPolicy to Cluster check if shared HC was created
err = updateAndAssertExternalTrafficPolicy(newSvc, lc, v1.ServiceExternalTrafficPolicyTypeCluster, hcNameShared)
if err != nil {
Expand Down Expand Up @@ -972,7 +971,7 @@ func TestIsRBSBasedService(t *testing.T) {
func TestIsRBSBasedServiceWithILBServices(t *testing.T) {
controller := newL4NetLBServiceController()
ilbSvc := test.NewL4ILBService(false, 8080)
ilbFrName := loadbalancers.NewL4Handler(ilbSvc, controller.ctx.Cloud, meta.Regional, controller.namer, record.NewFakeRecorder(100), &sync.Mutex{}).GetFRName()
ilbFrName := loadbalancers.NewL4Handler(ilbSvc, controller.ctx.Cloud, meta.Regional, controller.namer, record.NewFakeRecorder(100)).GetFRName()
ilbSvc.Annotations = map[string]string{
annotations.TCPForwardingRuleKey: ilbFrName,
annotations.UDPForwardingRuleKey: ilbFrName,
Expand Down
Loading

0 comments on commit a421fe8

Please sign in to comment.