Skip to content

Commit

Permalink
Emit event on Description-only healthcheck update
Browse files Browse the repository at this point in the history
  • Loading branch information
DamianSawicki committed Apr 14, 2023
1 parent 4c7c629 commit d851dac
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 13 deletions.
2 changes: 1 addition & 1 deletion pkg/backends/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Jig struct {
}

func newTestJig(fakeGCE *gce.Cloud) *Jig {
fakeHealthChecks := healthchecks.NewHealthChecker(fakeGCE, "/", defaultBackendSvc)
fakeHealthChecks := healthchecks.NewHealthChecker(fakeGCE, "/", defaultBackendSvc, healthchecks.NewFakeRecorderGetter(0), healthchecks.NewFakeServiceGetter())
fakeBackendPool := NewPool(fakeGCE, defaultNamer)

fakeIGs := instancegroups.NewEmptyFakeInstanceGroups()
Expand Down
2 changes: 1 addition & 1 deletion pkg/backends/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ var (
)

func newTestSyncer(fakeGCE *gce.Cloud) *backendSyncer {
fakeHealthChecks := healthchecks.NewHealthChecker(fakeGCE, "/", defaultBackendSvc)
fakeHealthChecks := healthchecks.NewHealthChecker(fakeGCE, "/", defaultBackendSvc, healthchecks.NewFakeRecorderGetter(0), healthchecks.NewFakeServiceGetter())

fakeBackendPool := NewPool(fakeGCE, defaultNamer)

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func NewLoadBalancerController(
Interface: ctx.KubeClient.CoreV1().Events(""),
})

healthChecker := healthchecks.NewHealthChecker(ctx.Cloud, ctx.HealthCheckPath, ctx.DefaultBackendSvcPort.ID.Service)
healthChecker := healthchecks.NewHealthChecker(ctx.Cloud, ctx.HealthCheckPath, ctx.DefaultBackendSvcPort.ID.Service, ctx, ctx.Translator)
backendPool := backends.NewPool(ctx.Cloud, ctx.ClusterNamer)

lbc := LoadBalancerController{
Expand Down
7 changes: 7 additions & 0 deletions pkg/controller/translator/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
listers "k8s.io/client-go/listers/core/v1"
Expand Down Expand Up @@ -109,6 +110,12 @@ func (t *Translator) getCachedService(id utils.ServicePortID) (*api_v1.Service,
return svc, nil
}

// GetService Implements ServiceGetter interface.
func (t *Translator) GetService(namespace, name string) (*api_v1.Service, error) {
dummyServicePort := utils.ServicePortID{Service: types.NamespacedName{Namespace: namespace, Name: name}}
return t.getCachedService(dummyServicePort)
}

// maybeEnableNEG enables NEG on the service port if necessary
func maybeEnableNEG(sp *utils.ServicePort, svc *api_v1.Service) error {
negAnnotation, ok, err := annotations.FromService(svc).NEGAnnotation()
Expand Down
8 changes: 8 additions & 0 deletions pkg/healthchecks/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ type fieldDiffs struct {
func (c *fieldDiffs) add(field, oldv, newv string) {
c.f = append(c.f, fmt.Sprintf("%s:%s -> %s", field, oldv, newv))
}
func (c *fieldDiffs) has(field string) bool {
for _, s := range c.f {
if strings.HasPrefix(s, field+":") {
return true
}
}
return false
}
func (c *fieldDiffs) String() string { return strings.Join(c.f, ", ") }
func (c *fieldDiffs) hasDiff() bool { return len(c.f) > 0 }
func (c *fieldDiffs) size() int64 { return int64(len(c.f)) }
Expand Down
25 changes: 23 additions & 2 deletions pkg/healthchecks/healthchecks.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@ type HealthChecks struct {
// This is a workaround which allows us to not have to maintain
// a separate health checker for the default backend.
defaultBackendSvc types.NamespacedName
recorderGetter RecorderGetter
serviceGetter ServiceGetter
}

// NewHealthChecker creates a new health checker.
// cloud: the cloud object implementing SingleHealthCheck.
// defaultHealthCheckPath: is the HTTP path to use for health checks.
func NewHealthChecker(cloud HealthCheckProvider, healthCheckPath string, defaultBackendSvc types.NamespacedName) *HealthChecks {
return &HealthChecks{cloud, healthCheckPath, defaultBackendSvc}
func NewHealthChecker(cloud HealthCheckProvider, healthCheckPath string, defaultBackendSvc types.NamespacedName, recorderGetter RecorderGetter, serviceGetter ServiceGetter) *HealthChecks {
return &HealthChecks{cloud, healthCheckPath, defaultBackendSvc, recorderGetter, serviceGetter}
}

// new returns a *HealthCheck with default settings and specified port/protocol
Expand All @@ -67,9 +69,23 @@ func (h *HealthChecks) new(sp utils.ServicePort) *translator.HealthCheck {
hc.Name = sp.BackendName()
hc.Port = sp.NodePort
hc.RequestPath = h.pathFromSvcPort(sp)
service := h.mainService(sp)
var err error
hc.Service, err = h.serviceGetter.GetService(service.Namespace, service.Name)
if err != nil {
klog.Errorf("Service %s/%s not found: %v.", service.Namespace, service.Name, err)
}
return hc
}

func (h *HealthChecks) mainService(sp utils.ServicePort) types.NamespacedName {
service := h.defaultBackendSvc
if sp.ID.Service.Name != "" {
service = sp.ID.Service
}
return service
}

// SyncServicePort implements HealthChecker.
func (h *HealthChecks) SyncServicePort(sp *utils.ServicePort, probe *v1.Probe) (string, error) {
hc := h.new(*sp)
Expand Down Expand Up @@ -142,6 +158,11 @@ func (h *HealthChecks) sync(hc *translator.HealthCheck, bchcc *backendconfigv1.H
changes := calculateDiff(filter(existingHC), filter(hc), bchcc)
if changes.hasDiff() {
klog.V(2).Infof("Health check %q needs update (%s)", existingHC.Name, changes)
if flags.F.EnableUpdateCustomHealthCheckDescription && changes.size() == 1 && changes.has("Description") {
message := fmt.Sprintf("Healthcheck will be updated and the only field updated is Description.\nOld: %+v\nNew: %+v\nDiff: %+v", existingHC, hc, changes)
h.recorderGetter.Recorder(hc.Service.Namespace).Event(
hc.Service, "Warning", "HealthcheckDescriptionUpdate", message)
}
err := h.update(hc)
if err != nil {
klog.Errorf("Health check %q update error: %v", existingHC.Name, err)
Expand Down
30 changes: 22 additions & 8 deletions pkg/healthchecks/healthchecks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"fmt"
"net/http"
"reflect"
"strings"
"testing"
"time"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
Expand Down Expand Up @@ -111,7 +113,7 @@ func init() {

func TestHealthCheckAdd(t *testing.T) {
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc)
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc, NewFakeRecorderGetter(0), NewFakeServiceGetter())

sp := &utils.ServicePort{NodePort: 80, Protocol: annotations.ProtocolHTTP, NEGEnabled: false, BackendNamer: testNamer}
_, err := healthChecks.SyncServicePort(sp, nil)
Expand Down Expand Up @@ -149,7 +151,7 @@ func TestHealthCheckAdd(t *testing.T) {

func TestHealthCheckAddExisting(t *testing.T) {
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc)
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc, NewFakeRecorderGetter(0), NewFakeServiceGetter())

// HTTP
// Manually insert a health check
Expand Down Expand Up @@ -221,7 +223,7 @@ func TestHealthCheckAddExisting(t *testing.T) {

func TestHealthCheckDelete(t *testing.T) {
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc)
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc, NewFakeRecorderGetter(0), NewFakeServiceGetter())

// Create HTTP HC for 1234
hc := translator.DefaultHealthCheck(1234, annotations.ProtocolHTTP)
Expand Down Expand Up @@ -251,7 +253,7 @@ func TestHealthCheckDelete(t *testing.T) {

func TestHTTP2HealthCheckDelete(t *testing.T) {
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc)
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc, NewFakeRecorderGetter(0), NewFakeServiceGetter())

// Create HTTP2 HC for 1234
hc := translator.DefaultHealthCheck(1234, annotations.ProtocolHTTP2)
Expand All @@ -278,7 +280,7 @@ func TestHTTP2HealthCheckDelete(t *testing.T) {

func TestRegionalHealthCheckDelete(t *testing.T) {
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc)
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc, NewFakeRecorderGetter(0), NewFakeServiceGetter())

hc := healthChecks.new(
utils.ServicePort{
Expand Down Expand Up @@ -330,7 +332,7 @@ func TestHealthCheckUpdate(t *testing.T) {
(fakeGCE.Compute().(*cloud.MockGCE)).MockAlphaHealthChecks.UpdateHook = mock.UpdateAlphaHealthCheckHook
(fakeGCE.Compute().(*cloud.MockGCE)).MockBetaHealthChecks.UpdateHook = mock.UpdateBetaHealthCheckHook

healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc)
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc, NewFakeRecorderGetter(0), NewFakeServiceGetter())

// HTTP
// Manually insert a health check
Expand Down Expand Up @@ -465,7 +467,8 @@ func TestRolloutUpdateCustomHCDescription(t *testing.T) {
(fakeGCE.Compute().(*cloud.MockGCE)).MockAlphaHealthChecks.UpdateHook = mock.UpdateAlphaHealthCheckHook
(fakeGCE.Compute().(*cloud.MockGCE)).MockBetaHealthChecks.UpdateHook = mock.UpdateBetaHealthCheckHook

healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc)
fakeSingletonRecorderGetter := NewFakeSingletonRecorderGetter(1)
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc, fakeSingletonRecorderGetter, NewFakeServiceGetter())

_, err := healthChecks.SyncServicePort(defaultSP, nil)
if err != nil {
Expand Down Expand Up @@ -517,6 +520,17 @@ func TestRolloutUpdateCustomHCDescription(t *testing.T) {
if !reflect.DeepEqual(outputBCHC, outputBCHCWithFlag) {
t.Fatalf("Compute healthcheck is:\n%s, want:\n%s", pretty.Sprint(outputBCHC), pretty.Sprint(outputBCHCWithFlag))
}

fakeRecorder := fakeSingletonRecorderGetter.FakeRecorder()
select {
case output := <-fakeRecorder.Events:
if !strings.HasPrefix(output, "Warning HealthcheckDescriptionUpdate") {
t.Fatalf("Incorrect event emitted on healthcheck update: %s.", output)
}
case <-time.After(10 * time.Second):
t.Fatalf("Timeout when expecting Event.")
}

}

func TestVersion(t *testing.T) {
Expand Down Expand Up @@ -1342,7 +1356,7 @@ func TestSyncServicePort(t *testing.T) {
tc.setup(mock)
}

hcs := NewHealthChecker(fakeGCE, "/", defaultBackendSvc)
hcs := NewHealthChecker(fakeGCE, "/", defaultBackendSvc, NewFakeRecorderGetter(0), NewFakeServiceGetter())

gotSelfLink, err := hcs.SyncServicePort(tc.sp, tc.probe)
if gotErr := err != nil; gotErr != tc.wantErr {
Expand Down
53 changes: 53 additions & 0 deletions pkg/healthchecks/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
computebeta "google.golang.org/api/compute/v0.beta"
"google.golang.org/api/compute/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
"k8s.io/ingress-gce/pkg/translator"
"k8s.io/ingress-gce/pkg/utils"
)
Expand Down Expand Up @@ -56,3 +58,54 @@ type HealthChecker interface {
Delete(name string, scope meta.KeyType) error
Get(name string, version meta.Version, scope meta.KeyType) (*translator.HealthCheck, error)
}

// ServiceGetter is an interface to retrieve Kubernetes Services.
type ServiceGetter interface {
GetService(namespace, name string) (*v1.Service, error)
}

func NewFakeServiceGetter() ServiceGetter {
return &fakeServiceGetter{}
}

type fakeServiceGetter struct{}

func (fsg *fakeServiceGetter) GetService(namespace, name string) (*v1.Service, error) {
return &v1.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: name},
}, nil
}

type RecorderGetter interface {
Recorder(ns string) record.EventRecorder
}

func NewFakeRecorderGetter(bufferSize int) RecorderGetter {
return &fakeRecorderGetter{bufferSize}
}

type fakeRecorderGetter struct {
bufferSize int
}

// Returns a different record.EventRecorder for every call.
func (frg *fakeRecorderGetter) Recorder(namespace string) record.EventRecorder {
return record.NewFakeRecorder(frg.bufferSize)
}

type SingletonFakeRecorderGetter struct {
recorder *record.FakeRecorder
}

// Returns the same record.EventRecorder irrespective of the namespace.
func (sfrg *SingletonFakeRecorderGetter) Recorder(namespace string) record.EventRecorder {
return sfrg.recorder
}

func (sfrg *SingletonFakeRecorderGetter) FakeRecorder() *record.FakeRecorder {
return sfrg.recorder
}

func NewFakeSingletonRecorderGetter(bufferSize int) *SingletonFakeRecorderGetter {
return &SingletonFakeRecorderGetter{recorder: record.NewFakeRecorder(bufferSize)}
}
2 changes: 2 additions & 0 deletions pkg/translator/healthchecks.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ type HealthCheck struct {
// compute struct back.
computealpha.HTTPHealthCheck
computealpha.HealthCheck

Service *v1.Service
}

// NewHealthCheck creates a HealthCheck which abstracts nested structs away
Expand Down

0 comments on commit d851dac

Please sign in to comment.