Skip to content

Commit

Permalink
Merge pull request #2072 from DamianSawicki/emit-event-on-desc-change
Browse files Browse the repository at this point in the history
Emit event on Description-only healthcheck update
  • Loading branch information
k8s-ci-robot authored Apr 21, 2023
2 parents 94fc70c + fe47055 commit f965f77
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 14 deletions.
2 changes: 1 addition & 1 deletion pkg/backends/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Jig struct {
}

func newTestJig(fakeGCE *gce.Cloud) *Jig {
fakeHealthChecks := healthchecks.NewHealthChecker(fakeGCE, "/", defaultBackendSvc)
fakeHealthChecks := healthchecks.NewHealthChecker(fakeGCE, "/", defaultBackendSvc, healthchecks.NewFakeRecorderGetter(0), healthchecks.NewFakeServiceGetter())
fakeBackendPool := NewPool(fakeGCE, defaultNamer)

fakeIGs := instancegroups.NewEmptyFakeInstanceGroups()
Expand Down
2 changes: 1 addition & 1 deletion pkg/backends/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ var (
)

func newTestSyncer(fakeGCE *gce.Cloud) *backendSyncer {
fakeHealthChecks := healthchecks.NewHealthChecker(fakeGCE, "/", defaultBackendSvc)
fakeHealthChecks := healthchecks.NewHealthChecker(fakeGCE, "/", defaultBackendSvc, healthchecks.NewFakeRecorderGetter(0), healthchecks.NewFakeServiceGetter())

fakeBackendPool := NewPool(fakeGCE, defaultNamer)

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func NewLoadBalancerController(
Interface: ctx.KubeClient.CoreV1().Events(""),
})

healthChecker := healthchecks.NewHealthChecker(ctx.Cloud, ctx.HealthCheckPath, ctx.DefaultBackendSvcPort.ID.Service)
healthChecker := healthchecks.NewHealthChecker(ctx.Cloud, ctx.HealthCheckPath, ctx.DefaultBackendSvcPort.ID.Service, ctx, ctx.Translator)
backendPool := backends.NewPool(ctx.Cloud, ctx.ClusterNamer)

lbc := LoadBalancerController{
Expand Down
7 changes: 7 additions & 0 deletions pkg/controller/translator/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
listers "k8s.io/client-go/listers/core/v1"
Expand Down Expand Up @@ -109,6 +110,12 @@ func (t *Translator) getCachedService(id utils.ServicePortID) (*api_v1.Service,
return svc, nil
}

// GetService Implements ServiceGetter interface.
func (t *Translator) GetService(namespace, name string) (*api_v1.Service, error) {
dummyServicePort := utils.ServicePortID{Service: types.NamespacedName{Namespace: namespace, Name: name}}
return t.getCachedService(dummyServicePort)
}

// maybeEnableNEG enables NEG on the service port if necessary
func maybeEnableNEG(sp *utils.ServicePort, svc *api_v1.Service) error {
negAnnotation, ok, err := annotations.FromService(svc).NEGAnnotation()
Expand Down
68 changes: 68 additions & 0 deletions pkg/healthchecks/fakes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package healthchecks

import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
)

func NewFakeServiceGetter() ServiceGetter {
return &fakeServiceGetter{}
}

type fakeServiceGetter struct{}

func (fsg *fakeServiceGetter) GetService(namespace, name string) (*v1.Service, error) {
return &v1.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: name},
}, nil
}

func NewFakeRecorderGetter(bufferSize int) RecorderGetter {
return &fakeRecorderGetter{bufferSize}
}

type fakeRecorderGetter struct {
bufferSize int
}

// Returns a different record.EventRecorder for every call.
func (frg *fakeRecorderGetter) Recorder(namespace string) record.EventRecorder {
return record.NewFakeRecorder(frg.bufferSize)
}

type singletonFakeRecorderGetter struct {
recorder *record.FakeRecorder
}

// Returns the same record.EventRecorder irrespective of the namespace.
func (sfrg *singletonFakeRecorderGetter) Recorder(namespace string) record.EventRecorder {
return sfrg.FakeRecorder()
}

func (sfrg *singletonFakeRecorderGetter) FakeRecorder() *record.FakeRecorder {
if sfrg.recorder == nil {
panic("singletonFakeRecorderGetter not initialised: recorder is nil.")
}
return sfrg.recorder
}

func NewFakeSingletonRecorderGetter(bufferSize int) *singletonFakeRecorderGetter {
return &singletonFakeRecorderGetter{recorder: record.NewFakeRecorder(bufferSize)}
}
9 changes: 8 additions & 1 deletion pkg/healthchecks/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ type fieldDiffs struct {
func (c *fieldDiffs) add(field, oldv, newv string) {
c.f = append(c.f, fmt.Sprintf("%s:%s -> %s", field, oldv, newv))
}
func (c *fieldDiffs) has(field string) bool {
for _, s := range c.f {
if strings.HasPrefix(s, field+":") {
return true
}
}
return false
}
func (c *fieldDiffs) String() string { return strings.Join(c.f, ", ") }
func (c *fieldDiffs) hasDiff() bool { return len(c.f) > 0 }
func (c *fieldDiffs) size() int64 { return int64(len(c.f)) }
Expand Down Expand Up @@ -74,7 +82,6 @@ func calculateDiff(old, new *translator.HealthCheck, c *backendconfigv1.HealthCh
changes.add("Port", strconv.FormatInt(old.Port, 10), strconv.FormatInt(new.Port, 10))
}
if old.Description != new.Description {
// TODO(DamianSawicki): Emit an event.
changes.add("Description", old.Description, new.Description)
}

Expand Down
37 changes: 35 additions & 2 deletions pkg/healthchecks/healthchecks.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@ type HealthChecks struct {
// This is a workaround which allows us to not have to maintain
// a separate health checker for the default backend.
defaultBackendSvc types.NamespacedName
recorderGetter RecorderGetter
serviceGetter ServiceGetter
}

// NewHealthChecker creates a new health checker.
// cloud: the cloud object implementing SingleHealthCheck.
// defaultHealthCheckPath: is the HTTP path to use for health checks.
func NewHealthChecker(cloud HealthCheckProvider, healthCheckPath string, defaultBackendSvc types.NamespacedName) *HealthChecks {
return &HealthChecks{cloud, healthCheckPath, defaultBackendSvc}
func NewHealthChecker(cloud HealthCheckProvider, healthCheckPath string, defaultBackendSvc types.NamespacedName, recorderGetter RecorderGetter, serviceGetter ServiceGetter) *HealthChecks {
return &HealthChecks{cloud, healthCheckPath, defaultBackendSvc, recorderGetter, serviceGetter}
}

// new returns a *HealthCheck with default settings and specified port/protocol
Expand All @@ -67,9 +69,31 @@ func (h *HealthChecks) new(sp utils.ServicePort) *translator.HealthCheck {
hc.Name = sp.BackendName()
hc.Port = sp.NodePort
hc.RequestPath = h.pathFromSvcPort(sp)
hc.Service = h.getService(sp)
return hc
}

func (h *HealthChecks) getService(sp utils.ServicePort) *v1.Service {
if !flags.F.EnableUpdateCustomHealthCheckDescription {
return nil
}
namespacedName := h.mainService(sp)
var err error
service, err := h.serviceGetter.GetService(namespacedName.Namespace, namespacedName.Name)
if err != nil {
klog.Warningf("Service %s/%s needed for emitting an event not found (we'll log instead): %v.", namespacedName.Namespace, namespacedName.Name, err)
}
return service
}

func (h *HealthChecks) mainService(sp utils.ServicePort) types.NamespacedName {
service := h.defaultBackendSvc
if sp.ID.Service.Name != "" {
service = sp.ID.Service
}
return service
}

// SyncServicePort implements HealthChecker.
func (h *HealthChecks) SyncServicePort(sp *utils.ServicePort, probe *v1.Probe) (string, error) {
hc := h.new(*sp)
Expand Down Expand Up @@ -142,6 +166,15 @@ func (h *HealthChecks) sync(hc *translator.HealthCheck, bchcc *backendconfigv1.H
changes := calculateDiff(filter(existingHC), filter(hc), bchcc)
if changes.hasDiff() {
klog.V(2).Infof("Health check %q needs update (%s)", existingHC.Name, changes)
if flags.F.EnableUpdateCustomHealthCheckDescription && changes.size() == 1 && changes.has("Description") {
message := fmt.Sprintf("Healthcheck will be updated and the only field updated is Description.\nOld: %+v\nNew: %+v\nDiff: %+v", existingHC, hc, changes)
if hc.Service != nil {
h.recorderGetter.Recorder(hc.Service.Namespace).Event(
hc.Service, v1.EventTypeNormal, "HealthcheckDescriptionUpdate", message)
} else {
klog.Info(message)
}
}
err := h.update(hc)
if err != nil {
klog.Errorf("Health check %q update error: %v", existingHC.Name, err)
Expand Down
30 changes: 22 additions & 8 deletions pkg/healthchecks/healthchecks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"fmt"
"net/http"
"reflect"
"strings"
"testing"
"time"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
Expand Down Expand Up @@ -111,7 +113,7 @@ func init() {

func TestHealthCheckAdd(t *testing.T) {
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc)
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc, NewFakeRecorderGetter(0), NewFakeServiceGetter())

sp := &utils.ServicePort{NodePort: 80, Protocol: annotations.ProtocolHTTP, NEGEnabled: false, BackendNamer: testNamer}
_, err := healthChecks.SyncServicePort(sp, nil)
Expand Down Expand Up @@ -149,7 +151,7 @@ func TestHealthCheckAdd(t *testing.T) {

func TestHealthCheckAddExisting(t *testing.T) {
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc)
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc, NewFakeRecorderGetter(0), NewFakeServiceGetter())

// HTTP
// Manually insert a health check
Expand Down Expand Up @@ -221,7 +223,7 @@ func TestHealthCheckAddExisting(t *testing.T) {

func TestHealthCheckDelete(t *testing.T) {
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc)
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc, NewFakeRecorderGetter(0), NewFakeServiceGetter())

// Create HTTP HC for 1234
hc := translator.DefaultHealthCheck(1234, annotations.ProtocolHTTP)
Expand Down Expand Up @@ -251,7 +253,7 @@ func TestHealthCheckDelete(t *testing.T) {

func TestHTTP2HealthCheckDelete(t *testing.T) {
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc)
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc, NewFakeRecorderGetter(0), NewFakeServiceGetter())

// Create HTTP2 HC for 1234
hc := translator.DefaultHealthCheck(1234, annotations.ProtocolHTTP2)
Expand All @@ -278,7 +280,7 @@ func TestHTTP2HealthCheckDelete(t *testing.T) {

func TestRegionalHealthCheckDelete(t *testing.T) {
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc)
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc, NewFakeRecorderGetter(0), NewFakeServiceGetter())

hc := healthChecks.new(
utils.ServicePort{
Expand Down Expand Up @@ -330,7 +332,7 @@ func TestHealthCheckUpdate(t *testing.T) {
(fakeGCE.Compute().(*cloud.MockGCE)).MockAlphaHealthChecks.UpdateHook = mock.UpdateAlphaHealthCheckHook
(fakeGCE.Compute().(*cloud.MockGCE)).MockBetaHealthChecks.UpdateHook = mock.UpdateBetaHealthCheckHook

healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc)
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc, NewFakeRecorderGetter(0), NewFakeServiceGetter())

// HTTP
// Manually insert a health check
Expand Down Expand Up @@ -465,7 +467,8 @@ func TestRolloutUpdateCustomHCDescription(t *testing.T) {
(fakeGCE.Compute().(*cloud.MockGCE)).MockAlphaHealthChecks.UpdateHook = mock.UpdateAlphaHealthCheckHook
(fakeGCE.Compute().(*cloud.MockGCE)).MockBetaHealthChecks.UpdateHook = mock.UpdateBetaHealthCheckHook

healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc)
fakeSingletonRecorderGetter := NewFakeSingletonRecorderGetter(1)
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc, fakeSingletonRecorderGetter, NewFakeServiceGetter())

_, err := healthChecks.SyncServicePort(defaultSP, nil)
if err != nil {
Expand Down Expand Up @@ -517,6 +520,17 @@ func TestRolloutUpdateCustomHCDescription(t *testing.T) {
if !reflect.DeepEqual(outputBCHC, outputBCHCWithFlag) {
t.Fatalf("Compute healthcheck is:\n%s, want:\n%s", pretty.Sprint(outputBCHC), pretty.Sprint(outputBCHCWithFlag))
}

fakeRecorder := fakeSingletonRecorderGetter.FakeRecorder()
select {
case output := <-fakeRecorder.Events:
if !strings.HasPrefix(output, "Normal HealthcheckDescriptionUpdate") {
t.Fatalf("Incorrect event emitted on healthcheck update: %s.", output)
}
case <-time.After(10 * time.Second):
t.Fatalf("Timeout when expecting Event.")
}

}

func TestVersion(t *testing.T) {
Expand Down Expand Up @@ -1342,7 +1356,7 @@ func TestSyncServicePort(t *testing.T) {
tc.setup(mock)
}

hcs := NewHealthChecker(fakeGCE, "/", defaultBackendSvc)
hcs := NewHealthChecker(fakeGCE, "/", defaultBackendSvc, NewFakeRecorderGetter(0), NewFakeServiceGetter())

gotSelfLink, err := hcs.SyncServicePort(tc.sp, tc.probe)
if gotErr := err != nil; gotErr != tc.wantErr {
Expand Down
10 changes: 10 additions & 0 deletions pkg/healthchecks/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
computebeta "google.golang.org/api/compute/v0.beta"
"google.golang.org/api/compute/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/ingress-gce/pkg/translator"
"k8s.io/ingress-gce/pkg/utils"
)
Expand Down Expand Up @@ -56,3 +57,12 @@ type HealthChecker interface {
Delete(name string, scope meta.KeyType) error
Get(name string, version meta.Version, scope meta.KeyType) (*translator.HealthCheck, error)
}

// ServiceGetter is an interface to retrieve Kubernetes Services.
type ServiceGetter interface {
GetService(namespace, name string) (*v1.Service, error)
}

type RecorderGetter interface {
Recorder(ns string) record.EventRecorder
}
2 changes: 2 additions & 0 deletions pkg/translator/healthchecks.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ type HealthCheck struct {
// compute struct back.
computealpha.HTTPHealthCheck
computealpha.HealthCheck

Service *v1.Service
}

// NewHealthCheck creates a HealthCheck which abstracts nested structs away
Expand Down

0 comments on commit f965f77

Please sign in to comment.