Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Easier l4 health checks - without singleton #3

Open
wants to merge 5 commits into
base: awesome-firewall-healthchecks
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package main
import (
"context"
"fmt"
"k8s.io/ingress-gce/pkg/healthchecks"
"math/rand"
"os"
"time"
Expand Down Expand Up @@ -199,6 +200,8 @@ func main() {
EndpointSlicesEnabled: flags.F.EnableEndpointSlices,
}
ctx := ingctx.NewControllerContext(kubeConfig, kubeClient, backendConfigClient, frontendConfigClient, svcNegClient, ingParamsClient, svcAttachmentClient, cloud, namer, kubeSystemUID, ctxConfig)
ctx.L4HealthChecks = healthchecks.NewL4(cloud, ctx)

go app.RunHTTPServer(ctx.HealthCheck)

if !flags.F.LeaderElection.LeaderElect {
Expand Down
13 changes: 7 additions & 6 deletions pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ package context
import (
context2 "context"
"fmt"
"sync"
"time"

apiv1 "k8s.io/api/core/v1"
apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -47,6 +44,7 @@ import (
"k8s.io/ingress-gce/pkg/flags"
frontendconfigclient "k8s.io/ingress-gce/pkg/frontendconfig/client/clientset/versioned"
informerfrontendconfig "k8s.io/ingress-gce/pkg/frontendconfig/client/informers/externalversions/frontendconfig/v1beta1"
"k8s.io/ingress-gce/pkg/healthcheckinterface"
ingparamsclient "k8s.io/ingress-gce/pkg/ingparams/client/clientset/versioned"
informeringparams "k8s.io/ingress-gce/pkg/ingparams/client/informers/externalversions/ingparams/v1beta1"
"k8s.io/ingress-gce/pkg/instances"
Expand All @@ -60,6 +58,8 @@ import (
"k8s.io/ingress-gce/pkg/utils/namer"
"k8s.io/klog"
"k8s.io/legacy-cloud-providers/gce"
"sync"
"time"
)

const (
Expand All @@ -80,9 +80,10 @@ type ControllerContext struct {

Cloud *gce.Cloud

ClusterNamer *namer.Namer
KubeSystemUID types.UID
L4Namer namer.L4ResourcesNamer
ClusterNamer *namer.Namer
KubeSystemUID types.UID
L4Namer namer.L4ResourcesNamer
L4HealthChecks healthcheckinterface.L4HealthChecks

ControllerContextConfig
ASMConfigController *cmconfig.ConfigMapConfigController
Expand Down
28 changes: 15 additions & 13 deletions pkg/firewalls/firewalls_l4.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@ limitations under the License.
package firewalls

import (
"strings"
"sync"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
"google.golang.org/api/compute/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/klog"
"k8s.io/legacy-cloud-providers/gce"
"strings"
)

// FirewallParams holds all data needed to create firewall for L4 LB
Expand All @@ -50,7 +48,7 @@ func EnsureL4FirewallRule(cloud *gce.Cloud, nsName string, params *FirewallParam
if err != nil {
return err
}
fwDesc, err := utils.MakeL4LBServiceDescription(nsName, params.IP, meta.VersionGA, sharedRule, params.L4Type)
fwDesc, err := utils.MakeL4LBFirewallDescription(nsName, params.IP, meta.VersionGA, sharedRule)
if err != nil {
klog.Warningf("EnsureL4FirewallRule(%v): failed to generate description for L4 %s rule, err: %v", params.Name, params.L4Type.ToString(), err)
}
Expand Down Expand Up @@ -78,7 +76,9 @@ func EnsureL4FirewallRule(cloud *gce.Cloud, nsName string, params *FirewallParam
}
return err
}
if firewallRuleEqual(expectedFw, existingFw) {

// Don't compare the "description" field for shared firewall rules
if firewallRuleEqual(expectedFw, existingFw, sharedRule) {
return nil
}
klog.V(2).Infof("EnsureL4FirewallRule(%v): updating L4 %s firewall", params.Name, params.L4Type.ToString())
Expand All @@ -103,13 +103,19 @@ func EnsureL4FirewallRuleDeleted(cloud *gce.Cloud, fwName string) error {
return nil
}

func firewallRuleEqual(a, b *compute.Firewall) bool {
return a.Description == b.Description &&
len(a.Allowed) == 1 && len(a.Allowed) == len(b.Allowed) &&
func firewallRuleEqual(a, b *compute.Firewall, skipDescription bool) bool {
fwrEqual := len(a.Allowed) == 1 &&
len(a.Allowed) == len(b.Allowed) &&
a.Allowed[0].IPProtocol == b.Allowed[0].IPProtocol &&
utils.EqualStringSets(a.Allowed[0].Ports, b.Allowed[0].Ports) &&
utils.EqualStringSets(a.SourceRanges, b.SourceRanges) &&
utils.EqualStringSets(a.TargetTags, b.TargetTags)

// Don't compare the "description" field for shared firewall rules
if skipDescription {
return fwrEqual
}
return fwrEqual && a.Description == b.Description
}

func ensureFirewall(svc *v1.Service, shared bool, params *FirewallParams, cloud *gce.Cloud, recorder record.EventRecorder) error {
Expand All @@ -126,12 +132,8 @@ func ensureFirewall(svc *v1.Service, shared bool, params *FirewallParams, cloud
}

// EnsureL4LBFirewallForHc creates or updates firewall rule for shared or non-shared health check to nodes
func EnsureL4LBFirewallForHc(svc *v1.Service, shared bool, params *FirewallParams, cloud *gce.Cloud, sharedResourcesLock *sync.Mutex, recorder record.EventRecorder) error {
func EnsureL4LBFirewallForHc(svc *v1.Service, shared bool, params *FirewallParams, cloud *gce.Cloud, recorder record.EventRecorder) error {
params.SourceRanges = gce.L4LoadBalancerSrcRanges()
if shared {
sharedResourcesLock.Lock()
defer sharedResourcesLock.Unlock()
}
return ensureFirewall(svc, shared, params, cloud, recorder)
}

Expand Down
33 changes: 33 additions & 0 deletions pkg/healthcheckinterface/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
Copyright 2015 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package healthcheckinterface

import (
v1 "k8s.io/api/core/v1"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/namer"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
)

// L4HealthChecks defines methods for creating and deleting health checks (and their firewall rules) for l4 services
type L4HealthChecks interface {
// EnsureL4HealthCheck creates health check (and firewall rule) for l4 service
EnsureL4HealthCheck(svc *v1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType, nodeNames []string) (string, string, string, string, error)
// DeleteHealthCheck deletes health check (and firewall rule) for l4 service
DeleteHealthCheck(svc *v1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType) (string, error)
}
164 changes: 144 additions & 20 deletions pkg/healthchecks/healthchecks_l4.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,25 @@ package healthchecks

import (
"fmt"
"strconv"
"sync"

cloudprovider "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/cloud-provider/service/helpers"
"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/composite"
"k8s.io/ingress-gce/pkg/events"
"k8s.io/ingress-gce/pkg/firewalls"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/namer"
"k8s.io/klog"
"k8s.io/legacy-cloud-providers/gce"
)

const (

// L4 Load Balancer parameters
gceHcCheckIntervalSeconds = int64(8)
gceHcTimeoutSeconds = int64(1)
Expand All @@ -42,44 +46,98 @@ const (
gceHcUnhealthyThreshold = int64(3)
)

// EnsureL4HealthCheck creates a new HTTP health check for an L4 LoadBalancer service, based on the parameters provided.
type l4HealthChecks struct {
mutex sync.Mutex
cloud *gce.Cloud
recorderFactory events.RecorderProducer
}

// NewL4 creates instance of l4HealthChecks> USe for test only.
func NewL4(cloud *gce.Cloud, recorderFactory events.RecorderProducer) *l4HealthChecks {
instance := &l4HealthChecks{
cloud: cloud,
recorderFactory: recorderFactory,
}
return instance
}

// EnsureL4HealthCheck creates a new HTTP health check for an L4 LoadBalancer service, and the firewall rule required
// for the healthcheck. If healthcheck is shared (external traffic policy 'cluster') then firewall rules will be shared
// regardless of scope param.
// If the healthcheck already exists, it is updated as needed.
func EnsureL4HealthCheck(cloud *gce.Cloud, svc *corev1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType) (string, string, int32, string, error) {
func (l4hc *l4HealthChecks) EnsureL4HealthCheck(svc *corev1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType, nodeNames []string) (string, string, string, string, error) {
hcName, hcFwName := namer.L4HealthCheck(svc.Namespace, svc.Name, sharedHC)
hcPath, hcPort := gce.GetNodesHealthCheckPath(), gce.GetNodesHealthCheckPort()
if !sharedHC {
hcPath, hcPort = helpers.GetServiceHealthCheckPathPort(svc)
hcPath, hcPort := helpers.GetServiceHealthCheckPathPort(svc)
if sharedHC {
hcPath, hcPort = gce.GetNodesHealthCheckPath(), gce.GetNodesHealthCheckPort()
// lock out entire EnsureL4HealthCheck process
l4hc.mutex.Lock()
defer l4hc.mutex.Unlock()
}

namespacedName := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}
_, hcLink, err := l4hc.ensureL4HealthCheckInternal(hcName, namespacedName, sharedHC, hcPath, hcPort, scope, l4Type)
if err != nil {
return "", "", "", annotations.HealthcheckResource, err
}
err = l4hc.ensureFirewall(svc, hcFwName, hcPort, sharedHC, nodeNames)
if err != nil {
return "", "", "", annotations.FirewallForHealthcheckResource, err
}
nn := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}
_, hcLink, err := ensureL4HealthCheckInternal(cloud, hcName, nn, sharedHC, hcPath, hcPort, scope, l4Type)
return hcLink, hcFwName, hcPort, hcName, err

return hcLink, hcFwName, hcName, "", err
}

func ensureL4HealthCheckInternal(cloud *gce.Cloud, name string, svcName types.NamespacedName, shared bool, path string, port int32, scope meta.KeyType, l4Type utils.L4LBType) (*composite.HealthCheck, string, error) {
// DeleteHealthCheck deletes health check (and firewall rule) for l4 service. Checks if shared resources are safe to delete.
func (l4hc *l4HealthChecks) DeleteHealthCheck(svc *corev1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType) (string, error) {
if sharedHC {
// lock out entire DeleteHealthCheck process
l4hc.mutex.Lock()
defer l4hc.mutex.Unlock()
}

hcName, hcFwName := namer.L4HealthCheck(svc.Namespace, svc.Name, sharedHC)
namespacedName := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}
err := utils.IgnoreHTTPNotFound(l4hc.deleteHealthCheck(hcName, scope))
if err != nil {
if !utils.IsInUsedByError(err) {
klog.Errorf("Failed to delete healthcheck for service %s - %v", namespacedName.String(), err)
return annotations.HealthcheckResource, err
}
// Ignore deletion error due to health check in use by another resource.
// This will be hit if this is a shared healthcheck.
klog.V(2).Infof("Failed to delete healthcheck %s: health check in use.", hcName)
return "", nil
}
// Health check deleted, now delete the firewall rule
return l4hc.deleteHealthCheckFirewall(svc, hcName, hcFwName, sharedHC, l4Type)
}

func (l4hc *l4HealthChecks) ensureL4HealthCheckInternal(name string, svcName types.NamespacedName, shared bool, path string, port int32, scope meta.KeyType, l4Type utils.L4LBType) (*composite.HealthCheck, string, error) {
selfLink := ""
key, err := composite.CreateKey(cloud, name, scope)
key, err := composite.CreateKey(l4hc.cloud, name, scope)
if err != nil {
return nil, selfLink, fmt.Errorf("Failed to create composite key for healthcheck %s - %w", name, err)
return nil, selfLink, fmt.Errorf("Failed to create key for healthcheck with name %s for service %s", name, svcName.String())
}
hc, err := composite.GetHealthCheck(cloud, key, meta.VersionGA)
hc, err := composite.GetHealthCheck(l4hc.cloud, key, meta.VersionGA)
if err != nil {
if !utils.IsNotFoundError(err) {
return nil, selfLink, err
}
}
var region string
if scope == meta.Regional {
region = cloud.Region()
region = l4hc.cloud.Region()
}
expectedHC := NewL4HealthCheck(name, svcName, shared, path, port, l4Type, scope, region)
if hc == nil {
// Create the healthcheck
klog.V(2).Infof("Creating healthcheck %s for service %s, shared = %v", name, svcName, shared)
err = composite.CreateHealthCheck(cloud, key, expectedHC)
err = composite.CreateHealthCheck(l4hc.cloud, key, expectedHC)
if err != nil {
return nil, selfLink, err
}
selfLink = cloudprovider.SelfLink(meta.VersionGA, cloud.ProjectID(), "healthChecks", key)
selfLink = cloudprovider.SelfLink(meta.VersionGA, l4hc.cloud.ProjectID(), "healthChecks", key)
return expectedHC, selfLink, nil
}
selfLink = hc.SelfLink
Expand All @@ -89,19 +147,85 @@ func ensureL4HealthCheckInternal(cloud *gce.Cloud, name string, svcName types.Na
}
mergeHealthChecks(hc, expectedHC)
klog.V(2).Infof("Updating healthcheck %s for service %s", name, svcName)
err = composite.UpdateHealthCheck(cloud, key, expectedHC)
err = composite.UpdateHealthCheck(l4hc.cloud, key, expectedHC)
if err != nil {
return nil, selfLink, err
}
return expectedHC, selfLink, err
}

func DeleteHealthCheck(cloud *gce.Cloud, name string, scope meta.KeyType) error {
key, err := composite.CreateKey(cloud, name, scope)
// ensureFirewall rule for L4 service.
// The firewall rules are the same for ILB and NetLB that use external traffic policy 'local' (sharedHC == true)
// despite healthchecks have different scopes (global vs regional)
func (l4hc *l4HealthChecks) ensureFirewall(svc *corev1.Service, hcFwName string, hcPort int32, sharedHC bool, nodeNames []string) error {
// Add firewall rule for healthchecks to nodes
hcFWRParams := firewalls.FirewallParams{
PortRanges: []string{strconv.Itoa(int(hcPort))},
SourceRanges: gce.L4LoadBalancerSrcRanges(),
Protocol: string(corev1.ProtocolTCP),
Name: hcFwName,
NodeNames: nodeNames,
}
return firewalls.EnsureL4LBFirewallForHc(svc, sharedHC, &hcFWRParams, l4hc.cloud, l4hc.recorderFactory.Recorder(svc.Namespace))
}

func (l4hc *l4HealthChecks) deleteHealthCheck(name string, scope meta.KeyType) error {
key, err := composite.CreateKey(l4hc.cloud, name, scope)
if err != nil {
return fmt.Errorf("Failed to create composite key for healthcheck %s - %w", name, err)
}
return composite.DeleteHealthCheck(cloud, key, meta.VersionGA)
return composite.DeleteHealthCheck(l4hc.cloud, key, meta.VersionGA)
}

func (l4hc *l4HealthChecks) deleteHealthCheckFirewall(svc *corev1.Service, hcName, hcFwName string, sharedHC bool, l4Type utils.L4LBType) (string, error) {
namespacedName := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}

safeToDelete, err := l4hc.healthCheckFirewallSafeToDelete(hcName, sharedHC, l4Type)
if err != nil {
klog.Errorf("Failed to delete health check firewall rule %s for service %s - %v", hcFwName, namespacedName.String(), err)
return annotations.HealthcheckResource, err
}
if !safeToDelete {
klog.V(2).Infof("Failed to delete health check firewall rule %s: health check in use.", hcName)
return "", nil
}
// Delete healthcheck firewall rule if no healthcheck uses the firewall rule.
err = l4hc.deleteFirewall(hcFwName, svc)
if err != nil {
klog.Errorf("Failed to delete firewall rule %s for internal loadbalancer service %s, err %v", hcFwName, namespacedName.String(), err)
return annotations.FirewallForHealthcheckResource, err
}
return "", nil
}

func (l4hc *l4HealthChecks) healthCheckFirewallSafeToDelete(hcName string, sharedHC bool, l4Type utils.L4LBType) (bool, error) {
if !sharedHC {
return true, nil
}
var scopeToCheck meta.KeyType
scopeToCheck = meta.Regional
if l4Type == utils.XLB {
scopeToCheck = meta.Global
}
key, err := composite.CreateKey(l4hc.cloud, hcName, scopeToCheck)
if err != nil {
return false, fmt.Errorf("Failed to create composite key for healthcheck %s - %w", hcName, err)
}
_, err = composite.GetHealthCheck(l4hc.cloud, key, meta.VersionGA)
return utils.IsNotFoundError(err), nil
}

func (l4hc *l4HealthChecks) deleteFirewall(name string, svc *corev1.Service) error {
err := firewalls.EnsureL4FirewallRuleDeleted(l4hc.cloud, name)
if err != nil {
if fwErr, ok := err.(*firewalls.FirewallXPNError); ok {
recorder := l4hc.recorderFactory.Recorder(svc.Namespace)
recorder.Eventf(svc, corev1.EventTypeNormal, "XPN", fwErr.Message)
return nil
}
return err
}
return nil
}

func NewL4HealthCheck(name string, svcName types.NamespacedName, shared bool, path string, port int32, l4Type utils.L4LBType, scope meta.KeyType, region string) *composite.HealthCheck {
Expand Down
Loading