From fe470555581661173840ddffc34406da4c9c9856 Mon Sep 17 00:00:00 2001 From: Damian Sawicki Date: Fri, 7 Apr 2023 13:18:07 +0000 Subject: [PATCH] Emit event on Description-only healthcheck update --- pkg/backends/integration_test.go | 2 +- pkg/backends/syncer_test.go | 2 +- pkg/controller/controller.go | 2 +- pkg/controller/translator/translator.go | 7 +++ pkg/healthchecks/fakes.go | 68 +++++++++++++++++++++++++ pkg/healthchecks/healthcheck.go | 9 +++- pkg/healthchecks/healthchecks.go | 37 +++++++++++++- pkg/healthchecks/healthchecks_test.go | 30 ++++++++--- pkg/healthchecks/interfaces.go | 10 ++++ pkg/translator/healthchecks.go | 2 + 10 files changed, 155 insertions(+), 14 deletions(-) create mode 100644 pkg/healthchecks/fakes.go diff --git a/pkg/backends/integration_test.go b/pkg/backends/integration_test.go index d3c961f661..041b93c6fe 100644 --- a/pkg/backends/integration_test.go +++ b/pkg/backends/integration_test.go @@ -42,7 +42,7 @@ type Jig struct { } func newTestJig(fakeGCE *gce.Cloud) *Jig { - fakeHealthChecks := healthchecks.NewHealthChecker(fakeGCE, "/", defaultBackendSvc) + fakeHealthChecks := healthchecks.NewHealthChecker(fakeGCE, "/", defaultBackendSvc, healthchecks.NewFakeRecorderGetter(0), healthchecks.NewFakeServiceGetter()) fakeBackendPool := NewPool(fakeGCE, defaultNamer) fakeIGs := instancegroups.NewEmptyFakeInstanceGroups() diff --git a/pkg/backends/syncer_test.go b/pkg/backends/syncer_test.go index 88432dbc24..d900b59abb 100644 --- a/pkg/backends/syncer_test.go +++ b/pkg/backends/syncer_test.go @@ -137,7 +137,7 @@ var ( ) func newTestSyncer(fakeGCE *gce.Cloud) *backendSyncer { - fakeHealthChecks := healthchecks.NewHealthChecker(fakeGCE, "/", defaultBackendSvc) + fakeHealthChecks := healthchecks.NewHealthChecker(fakeGCE, "/", defaultBackendSvc, healthchecks.NewFakeRecorderGetter(0), healthchecks.NewFakeServiceGetter()) fakeBackendPool := NewPool(fakeGCE, defaultNamer) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 64d58f8ead..8b214d0870 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -114,7 +114,7 @@ func NewLoadBalancerController( Interface: ctx.KubeClient.CoreV1().Events(""), }) - healthChecker := healthchecks.NewHealthChecker(ctx.Cloud, ctx.HealthCheckPath, ctx.DefaultBackendSvcPort.ID.Service) + healthChecker := healthchecks.NewHealthChecker(ctx.Cloud, ctx.HealthCheckPath, ctx.DefaultBackendSvcPort.ID.Service, ctx, ctx.Translator) backendPool := backends.NewPool(ctx.Cloud, ctx.ClusterNamer) lbc := LoadBalancerController{ diff --git a/pkg/controller/translator/translator.go b/pkg/controller/translator/translator.go index de3008ce95..9128a9b5bd 100644 --- a/pkg/controller/translator/translator.go +++ b/pkg/controller/translator/translator.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" listers "k8s.io/client-go/listers/core/v1" @@ -109,6 +110,12 @@ func (t *Translator) getCachedService(id utils.ServicePortID) (*api_v1.Service, return svc, nil } +// GetService Implements ServiceGetter interface. +func (t *Translator) GetService(namespace, name string) (*api_v1.Service, error) { + dummyServicePort := utils.ServicePortID{Service: types.NamespacedName{Namespace: namespace, Name: name}} + return t.getCachedService(dummyServicePort) +} + // maybeEnableNEG enables NEG on the service port if necessary func maybeEnableNEG(sp *utils.ServicePort, svc *api_v1.Service) error { negAnnotation, ok, err := annotations.FromService(svc).NEGAnnotation() diff --git a/pkg/healthchecks/fakes.go b/pkg/healthchecks/fakes.go new file mode 100644 index 0000000000..9d437a0591 --- /dev/null +++ b/pkg/healthchecks/fakes.go @@ -0,0 +1,68 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthchecks + +import ( + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/record" +) + +func NewFakeServiceGetter() ServiceGetter { + return &fakeServiceGetter{} +} + +type fakeServiceGetter struct{} + +func (fsg *fakeServiceGetter) GetService(namespace, name string) (*v1.Service, error) { + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: name}, + }, nil +} + +func NewFakeRecorderGetter(bufferSize int) RecorderGetter { + return &fakeRecorderGetter{bufferSize} +} + +type fakeRecorderGetter struct { + bufferSize int +} + +// Returns a different record.EventRecorder for every call. +func (frg *fakeRecorderGetter) Recorder(namespace string) record.EventRecorder { + return record.NewFakeRecorder(frg.bufferSize) +} + +type singletonFakeRecorderGetter struct { + recorder *record.FakeRecorder +} + +// Returns the same record.EventRecorder irrespective of the namespace. +func (sfrg *singletonFakeRecorderGetter) Recorder(namespace string) record.EventRecorder { + return sfrg.FakeRecorder() +} + +func (sfrg *singletonFakeRecorderGetter) FakeRecorder() *record.FakeRecorder { + if sfrg.recorder == nil { + panic("singletonFakeRecorderGetter not initialised: recorder is nil.") + } + return sfrg.recorder +} + +func NewFakeSingletonRecorderGetter(bufferSize int) *singletonFakeRecorderGetter { + return &singletonFakeRecorderGetter{recorder: record.NewFakeRecorder(bufferSize)} +} diff --git a/pkg/healthchecks/healthcheck.go b/pkg/healthchecks/healthcheck.go index 2c605c25f3..70901333c2 100644 --- a/pkg/healthchecks/healthcheck.go +++ b/pkg/healthchecks/healthcheck.go @@ -33,6 +33,14 @@ type fieldDiffs struct { func (c *fieldDiffs) add(field, oldv, newv string) { c.f = append(c.f, fmt.Sprintf("%s:%s -> %s", field, oldv, newv)) } +func (c *fieldDiffs) has(field string) bool { + for _, s := range c.f { + if strings.HasPrefix(s, field+":") { + return true + } + } + return false +} func (c *fieldDiffs) String() string { return strings.Join(c.f, ", ") } func (c *fieldDiffs) hasDiff() bool { return len(c.f) > 0 } func (c *fieldDiffs) size() int64 { return int64(len(c.f)) } @@ -74,7 +82,6 @@ func calculateDiff(old, new *translator.HealthCheck, c *backendconfigv1.HealthCh changes.add("Port", strconv.FormatInt(old.Port, 10), strconv.FormatInt(new.Port, 10)) } if old.Description != new.Description { - // TODO(DamianSawicki): Emit an event. changes.add("Description", old.Description, new.Description) } diff --git a/pkg/healthchecks/healthchecks.go b/pkg/healthchecks/healthchecks.go index cb7ae56451..2176798e84 100644 --- a/pkg/healthchecks/healthchecks.go +++ b/pkg/healthchecks/healthchecks.go @@ -43,13 +43,15 @@ type HealthChecks struct { // This is a workaround which allows us to not have to maintain // a separate health checker for the default backend. defaultBackendSvc types.NamespacedName + recorderGetter RecorderGetter + serviceGetter ServiceGetter } // NewHealthChecker creates a new health checker. // cloud: the cloud object implementing SingleHealthCheck. // defaultHealthCheckPath: is the HTTP path to use for health checks. -func NewHealthChecker(cloud HealthCheckProvider, healthCheckPath string, defaultBackendSvc types.NamespacedName) *HealthChecks { - return &HealthChecks{cloud, healthCheckPath, defaultBackendSvc} +func NewHealthChecker(cloud HealthCheckProvider, healthCheckPath string, defaultBackendSvc types.NamespacedName, recorderGetter RecorderGetter, serviceGetter ServiceGetter) *HealthChecks { + return &HealthChecks{cloud, healthCheckPath, defaultBackendSvc, recorderGetter, serviceGetter} } // new returns a *HealthCheck with default settings and specified port/protocol @@ -67,9 +69,31 @@ func (h *HealthChecks) new(sp utils.ServicePort) *translator.HealthCheck { hc.Name = sp.BackendName() hc.Port = sp.NodePort hc.RequestPath = h.pathFromSvcPort(sp) + hc.Service = h.getService(sp) return hc } +func (h *HealthChecks) getService(sp utils.ServicePort) *v1.Service { + if !flags.F.EnableUpdateCustomHealthCheckDescription { + return nil + } + namespacedName := h.mainService(sp) + var err error + service, err := h.serviceGetter.GetService(namespacedName.Namespace, namespacedName.Name) + if err != nil { + klog.Warningf("Service %s/%s needed for emitting an event not found (we'll log instead): %v.", namespacedName.Namespace, namespacedName.Name, err) + } + return service +} + +func (h *HealthChecks) mainService(sp utils.ServicePort) types.NamespacedName { + service := h.defaultBackendSvc + if sp.ID.Service.Name != "" { + service = sp.ID.Service + } + return service +} + // SyncServicePort implements HealthChecker. func (h *HealthChecks) SyncServicePort(sp *utils.ServicePort, probe *v1.Probe) (string, error) { hc := h.new(*sp) @@ -142,6 +166,15 @@ func (h *HealthChecks) sync(hc *translator.HealthCheck, bchcc *backendconfigv1.H changes := calculateDiff(filter(existingHC), filter(hc), bchcc) if changes.hasDiff() { klog.V(2).Infof("Health check %q needs update (%s)", existingHC.Name, changes) + if flags.F.EnableUpdateCustomHealthCheckDescription && changes.size() == 1 && changes.has("Description") { + message := fmt.Sprintf("Healthcheck will be updated and the only field updated is Description.\nOld: %+v\nNew: %+v\nDiff: %+v", existingHC, hc, changes) + if hc.Service != nil { + h.recorderGetter.Recorder(hc.Service.Namespace).Event( + hc.Service, v1.EventTypeNormal, "HealthcheckDescriptionUpdate", message) + } else { + klog.Info(message) + } + } err := h.update(hc) if err != nil { klog.Errorf("Health check %q update error: %v", existingHC.Name, err) diff --git a/pkg/healthchecks/healthchecks_test.go b/pkg/healthchecks/healthchecks_test.go index f23e7637b1..a0c60f263d 100644 --- a/pkg/healthchecks/healthchecks_test.go +++ b/pkg/healthchecks/healthchecks_test.go @@ -21,7 +21,9 @@ import ( "fmt" "net/http" "reflect" + "strings" "testing" + "time" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" @@ -111,7 +113,7 @@ func init() { func TestHealthCheckAdd(t *testing.T) { fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues()) - healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc) + healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc, NewFakeRecorderGetter(0), NewFakeServiceGetter()) sp := &utils.ServicePort{NodePort: 80, Protocol: annotations.ProtocolHTTP, NEGEnabled: false, BackendNamer: testNamer} _, err := healthChecks.SyncServicePort(sp, nil) @@ -149,7 +151,7 @@ func TestHealthCheckAdd(t *testing.T) { func TestHealthCheckAddExisting(t *testing.T) { fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues()) - healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc) + healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc, NewFakeRecorderGetter(0), NewFakeServiceGetter()) // HTTP // Manually insert a health check @@ -221,7 +223,7 @@ func TestHealthCheckAddExisting(t *testing.T) { func TestHealthCheckDelete(t *testing.T) { fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues()) - healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc) + healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc, NewFakeRecorderGetter(0), NewFakeServiceGetter()) // Create HTTP HC for 1234 hc := translator.DefaultHealthCheck(1234, annotations.ProtocolHTTP) @@ -251,7 +253,7 @@ func TestHealthCheckDelete(t *testing.T) { func TestHTTP2HealthCheckDelete(t *testing.T) { fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues()) - healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc) + healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc, NewFakeRecorderGetter(0), NewFakeServiceGetter()) // Create HTTP2 HC for 1234 hc := translator.DefaultHealthCheck(1234, annotations.ProtocolHTTP2) @@ -278,7 +280,7 @@ func TestHTTP2HealthCheckDelete(t *testing.T) { func TestRegionalHealthCheckDelete(t *testing.T) { fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues()) - healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc) + healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc, NewFakeRecorderGetter(0), NewFakeServiceGetter()) hc := healthChecks.new( utils.ServicePort{ @@ -330,7 +332,7 @@ func TestHealthCheckUpdate(t *testing.T) { (fakeGCE.Compute().(*cloud.MockGCE)).MockAlphaHealthChecks.UpdateHook = mock.UpdateAlphaHealthCheckHook (fakeGCE.Compute().(*cloud.MockGCE)).MockBetaHealthChecks.UpdateHook = mock.UpdateBetaHealthCheckHook - healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc) + healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc, NewFakeRecorderGetter(0), NewFakeServiceGetter()) // HTTP // Manually insert a health check @@ -465,7 +467,8 @@ func TestRolloutUpdateCustomHCDescription(t *testing.T) { (fakeGCE.Compute().(*cloud.MockGCE)).MockAlphaHealthChecks.UpdateHook = mock.UpdateAlphaHealthCheckHook (fakeGCE.Compute().(*cloud.MockGCE)).MockBetaHealthChecks.UpdateHook = mock.UpdateBetaHealthCheckHook - healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc) + fakeSingletonRecorderGetter := NewFakeSingletonRecorderGetter(1) + healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc, fakeSingletonRecorderGetter, NewFakeServiceGetter()) _, err := healthChecks.SyncServicePort(defaultSP, nil) if err != nil { @@ -517,6 +520,17 @@ func TestRolloutUpdateCustomHCDescription(t *testing.T) { if !reflect.DeepEqual(outputBCHC, outputBCHCWithFlag) { t.Fatalf("Compute healthcheck is:\n%s, want:\n%s", pretty.Sprint(outputBCHC), pretty.Sprint(outputBCHCWithFlag)) } + + fakeRecorder := fakeSingletonRecorderGetter.FakeRecorder() + select { + case output := <-fakeRecorder.Events: + if !strings.HasPrefix(output, "Normal HealthcheckDescriptionUpdate") { + t.Fatalf("Incorrect event emitted on healthcheck update: %s.", output) + } + case <-time.After(10 * time.Second): + t.Fatalf("Timeout when expecting Event.") + } + } func TestVersion(t *testing.T) { @@ -1342,7 +1356,7 @@ func TestSyncServicePort(t *testing.T) { tc.setup(mock) } - hcs := NewHealthChecker(fakeGCE, "/", defaultBackendSvc) + hcs := NewHealthChecker(fakeGCE, "/", defaultBackendSvc, NewFakeRecorderGetter(0), NewFakeServiceGetter()) gotSelfLink, err := hcs.SyncServicePort(tc.sp, tc.probe) if gotErr := err != nil; gotErr != tc.wantErr { diff --git a/pkg/healthchecks/interfaces.go b/pkg/healthchecks/interfaces.go index 60cb8d9f79..899329613d 100644 --- a/pkg/healthchecks/interfaces.go +++ b/pkg/healthchecks/interfaces.go @@ -22,6 +22,7 @@ import ( computebeta "google.golang.org/api/compute/v0.beta" "google.golang.org/api/compute/v1" v1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/record" "k8s.io/ingress-gce/pkg/translator" "k8s.io/ingress-gce/pkg/utils" ) @@ -56,3 +57,12 @@ type HealthChecker interface { Delete(name string, scope meta.KeyType) error Get(name string, version meta.Version, scope meta.KeyType) (*translator.HealthCheck, error) } + +// ServiceGetter is an interface to retrieve Kubernetes Services. +type ServiceGetter interface { + GetService(namespace, name string) (*v1.Service, error) +} + +type RecorderGetter interface { + Recorder(ns string) record.EventRecorder +} diff --git a/pkg/translator/healthchecks.go b/pkg/translator/healthchecks.go index caa541559c..2339120e97 100644 --- a/pkg/translator/healthchecks.go +++ b/pkg/translator/healthchecks.go @@ -90,6 +90,8 @@ type HealthCheck struct { // compute struct back. computealpha.HTTPHealthCheck computealpha.HealthCheck + + Service *v1.Service } // NewHealthCheck creates a HealthCheck which abstracts nested structs away