Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit event on Description-only healthcheck update #2072

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/backends/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Jig struct {
}

func newTestJig(fakeGCE *gce.Cloud) *Jig {
fakeHealthChecks := healthchecks.NewHealthChecker(fakeGCE, "/", defaultBackendSvc)
fakeHealthChecks := healthchecks.NewHealthChecker(fakeGCE, "/", defaultBackendSvc, healthchecks.NewFakeRecorderGetter(0), healthchecks.NewFakeServiceGetter())
fakeBackendPool := NewPool(fakeGCE, defaultNamer)

fakeIGs := instancegroups.NewEmptyFakeInstanceGroups()
Expand Down
2 changes: 1 addition & 1 deletion pkg/backends/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ var (
)

func newTestSyncer(fakeGCE *gce.Cloud) *backendSyncer {
fakeHealthChecks := healthchecks.NewHealthChecker(fakeGCE, "/", defaultBackendSvc)
fakeHealthChecks := healthchecks.NewHealthChecker(fakeGCE, "/", defaultBackendSvc, healthchecks.NewFakeRecorderGetter(0), healthchecks.NewFakeServiceGetter())

fakeBackendPool := NewPool(fakeGCE, defaultNamer)

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func NewLoadBalancerController(
Interface: ctx.KubeClient.CoreV1().Events(""),
})

healthChecker := healthchecks.NewHealthChecker(ctx.Cloud, ctx.HealthCheckPath, ctx.DefaultBackendSvcPort.ID.Service)
healthChecker := healthchecks.NewHealthChecker(ctx.Cloud, ctx.HealthCheckPath, ctx.DefaultBackendSvcPort.ID.Service, ctx, ctx.Translator)
backendPool := backends.NewPool(ctx.Cloud, ctx.ClusterNamer)

lbc := LoadBalancerController{
Expand Down
7 changes: 7 additions & 0 deletions pkg/controller/translator/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
listers "k8s.io/client-go/listers/core/v1"
Expand Down Expand Up @@ -109,6 +110,12 @@ func (t *Translator) getCachedService(id utils.ServicePortID) (*api_v1.Service,
return svc, nil
}

// GetService Implements ServiceGetter interface.
func (t *Translator) GetService(namespace, name string) (*api_v1.Service, error) {
dummyServicePort := utils.ServicePortID{Service: types.NamespacedName{Namespace: namespace, Name: name}}
return t.getCachedService(dummyServicePort)
}

// maybeEnableNEG enables NEG on the service port if necessary
func maybeEnableNEG(sp *utils.ServicePort, svc *api_v1.Service) error {
negAnnotation, ok, err := annotations.FromService(svc).NEGAnnotation()
Expand Down
68 changes: 68 additions & 0 deletions pkg/healthchecks/fakes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
Copyright 2023 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package healthchecks

import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
)

func NewFakeServiceGetter() ServiceGetter {
return &fakeServiceGetter{}
}

type fakeServiceGetter struct{}

func (fsg *fakeServiceGetter) GetService(namespace, name string) (*v1.Service, error) {
return &v1.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: name},
}, nil
}

func NewFakeRecorderGetter(bufferSize int) RecorderGetter {
return &fakeRecorderGetter{bufferSize}
}

type fakeRecorderGetter struct {
bufferSize int
}

// Returns a different record.EventRecorder for every call.
func (frg *fakeRecorderGetter) Recorder(namespace string) record.EventRecorder {
return record.NewFakeRecorder(frg.bufferSize)
}

type singletonFakeRecorderGetter struct {
recorder *record.FakeRecorder
}

// Returns the same record.EventRecorder irrespective of the namespace.
func (sfrg *singletonFakeRecorderGetter) Recorder(namespace string) record.EventRecorder {
return sfrg.FakeRecorder()
}

func (sfrg *singletonFakeRecorderGetter) FakeRecorder() *record.FakeRecorder {
if sfrg.recorder == nil {
panic("singletonFakeRecorderGetter not initialised: recorder is nil.")
}
return sfrg.recorder
}

func NewFakeSingletonRecorderGetter(bufferSize int) *singletonFakeRecorderGetter {
return &singletonFakeRecorderGetter{recorder: record.NewFakeRecorder(bufferSize)}
}
9 changes: 8 additions & 1 deletion pkg/healthchecks/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ type fieldDiffs struct {
func (c *fieldDiffs) add(field, oldv, newv string) {
c.f = append(c.f, fmt.Sprintf("%s:%s -> %s", field, oldv, newv))
}
func (c *fieldDiffs) has(field string) bool {
for _, s := range c.f {
if strings.HasPrefix(s, field+":") {
return true
}
}
return false
}
func (c *fieldDiffs) String() string { return strings.Join(c.f, ", ") }
func (c *fieldDiffs) hasDiff() bool { return len(c.f) > 0 }
func (c *fieldDiffs) size() int64 { return int64(len(c.f)) }
Expand Down Expand Up @@ -74,7 +82,6 @@ func calculateDiff(old, new *translator.HealthCheck, c *backendconfigv1.HealthCh
changes.add("Port", strconv.FormatInt(old.Port, 10), strconv.FormatInt(new.Port, 10))
}
if old.Description != new.Description {
// TODO(DamianSawicki): Emit an event.
changes.add("Description", old.Description, new.Description)
}

Expand Down
37 changes: 35 additions & 2 deletions pkg/healthchecks/healthchecks.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@ type HealthChecks struct {
// This is a workaround which allows us to not have to maintain
// a separate health checker for the default backend.
defaultBackendSvc types.NamespacedName
recorderGetter RecorderGetter
serviceGetter ServiceGetter
}

// NewHealthChecker creates a new health checker.
// cloud: the cloud object implementing SingleHealthCheck.
// defaultHealthCheckPath: is the HTTP path to use for health checks.
func NewHealthChecker(cloud HealthCheckProvider, healthCheckPath string, defaultBackendSvc types.NamespacedName) *HealthChecks {
return &HealthChecks{cloud, healthCheckPath, defaultBackendSvc}
func NewHealthChecker(cloud HealthCheckProvider, healthCheckPath string, defaultBackendSvc types.NamespacedName, recorderGetter RecorderGetter, serviceGetter ServiceGetter) *HealthChecks {
return &HealthChecks{cloud, healthCheckPath, defaultBackendSvc, recorderGetter, serviceGetter}
}

// new returns a *HealthCheck with default settings and specified port/protocol
Expand All @@ -67,9 +69,31 @@ func (h *HealthChecks) new(sp utils.ServicePort) *translator.HealthCheck {
hc.Name = sp.BackendName()
hc.Port = sp.NodePort
hc.RequestPath = h.pathFromSvcPort(sp)
hc.Service = h.getService(sp)
return hc
}

func (h *HealthChecks) getService(sp utils.ServicePort) *v1.Service {
if !flags.F.EnableUpdateCustomHealthCheckDescription {
return nil
}
namespacedName := h.mainService(sp)
var err error
service, err := h.serviceGetter.GetService(namespacedName.Namespace, namespacedName.Name)
if err != nil {
klog.Warningf("Service %s/%s needed for emitting an event not found (we'll log instead): %v.", namespacedName.Namespace, namespacedName.Name, err)
}
return service
}

func (h *HealthChecks) mainService(sp utils.ServicePort) types.NamespacedName {
service := h.defaultBackendSvc
if sp.ID.Service.Name != "" {
service = sp.ID.Service
}
return service
}

// SyncServicePort implements HealthChecker.
func (h *HealthChecks) SyncServicePort(sp *utils.ServicePort, probe *v1.Probe) (string, error) {
hc := h.new(*sp)
Expand Down Expand Up @@ -142,6 +166,15 @@ func (h *HealthChecks) sync(hc *translator.HealthCheck, bchcc *backendconfigv1.H
changes := calculateDiff(filter(existingHC), filter(hc), bchcc)
if changes.hasDiff() {
klog.V(2).Infof("Health check %q needs update (%s)", existingHC.Name, changes)
if flags.F.EnableUpdateCustomHealthCheckDescription && changes.size() == 1 && changes.has("Description") {
message := fmt.Sprintf("Healthcheck will be updated and the only field updated is Description.\nOld: %+v\nNew: %+v\nDiff: %+v", existingHC, hc, changes)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems a very large message from an event, can we just send the diff or do we miss some important information?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The if guarantees that the diff contains only the Description update. Recall from our discussions that this event emission was intended as a precaution during Description update, so the event would not serve this purpose well if it contained only the "positive scenario" information.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the message contains the New and Old too, can we send only the changes? do we need the others?

%+v\nNew: %+v\nDiff: %+v", existingHC, hc, changes)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'm trying to say that we need the others. The changes aka "diff" inside the if consists only of the modified Description (changes.size() == 1 && changes.has("Description")). However, we want to include more information in the message as a precaution.

A background fact that I should probably mention is that calculateDiff() does not return the full diff. It checks selected fields only (recently Description was added as one of them), sometimes just 2 of them.

if hc.Service != nil {
h.recorderGetter.Recorder(hc.Service.Namespace).Event(
hc.Service, v1.EventTypeNormal, "HealthcheckDescriptionUpdate", message)
} else {
klog.Info(message)
}
}
err := h.update(hc)
if err != nil {
klog.Errorf("Health check %q update error: %v", existingHC.Name, err)
Expand Down
30 changes: 22 additions & 8 deletions pkg/healthchecks/healthchecks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"fmt"
"net/http"
"reflect"
"strings"
"testing"
"time"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
Expand Down Expand Up @@ -111,7 +113,7 @@ func init() {

func TestHealthCheckAdd(t *testing.T) {
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc)
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc, NewFakeRecorderGetter(0), NewFakeServiceGetter())

sp := &utils.ServicePort{NodePort: 80, Protocol: annotations.ProtocolHTTP, NEGEnabled: false, BackendNamer: testNamer}
_, err := healthChecks.SyncServicePort(sp, nil)
Expand Down Expand Up @@ -149,7 +151,7 @@ func TestHealthCheckAdd(t *testing.T) {

func TestHealthCheckAddExisting(t *testing.T) {
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc)
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc, NewFakeRecorderGetter(0), NewFakeServiceGetter())

// HTTP
// Manually insert a health check
Expand Down Expand Up @@ -221,7 +223,7 @@ func TestHealthCheckAddExisting(t *testing.T) {

func TestHealthCheckDelete(t *testing.T) {
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc)
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc, NewFakeRecorderGetter(0), NewFakeServiceGetter())

// Create HTTP HC for 1234
hc := translator.DefaultHealthCheck(1234, annotations.ProtocolHTTP)
Expand Down Expand Up @@ -251,7 +253,7 @@ func TestHealthCheckDelete(t *testing.T) {

func TestHTTP2HealthCheckDelete(t *testing.T) {
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc)
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc, NewFakeRecorderGetter(0), NewFakeServiceGetter())

// Create HTTP2 HC for 1234
hc := translator.DefaultHealthCheck(1234, annotations.ProtocolHTTP2)
Expand All @@ -278,7 +280,7 @@ func TestHTTP2HealthCheckDelete(t *testing.T) {

func TestRegionalHealthCheckDelete(t *testing.T) {
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc)
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc, NewFakeRecorderGetter(0), NewFakeServiceGetter())

hc := healthChecks.new(
utils.ServicePort{
Expand Down Expand Up @@ -330,7 +332,7 @@ func TestHealthCheckUpdate(t *testing.T) {
(fakeGCE.Compute().(*cloud.MockGCE)).MockAlphaHealthChecks.UpdateHook = mock.UpdateAlphaHealthCheckHook
(fakeGCE.Compute().(*cloud.MockGCE)).MockBetaHealthChecks.UpdateHook = mock.UpdateBetaHealthCheckHook

healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc)
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc, NewFakeRecorderGetter(0), NewFakeServiceGetter())

// HTTP
// Manually insert a health check
Expand Down Expand Up @@ -465,7 +467,8 @@ func TestRolloutUpdateCustomHCDescription(t *testing.T) {
(fakeGCE.Compute().(*cloud.MockGCE)).MockAlphaHealthChecks.UpdateHook = mock.UpdateAlphaHealthCheckHook
(fakeGCE.Compute().(*cloud.MockGCE)).MockBetaHealthChecks.UpdateHook = mock.UpdateBetaHealthCheckHook

healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc)
fakeSingletonRecorderGetter := NewFakeSingletonRecorderGetter(1)
healthChecks := NewHealthChecker(fakeGCE, "/", defaultBackendSvc, fakeSingletonRecorderGetter, NewFakeServiceGetter())

_, err := healthChecks.SyncServicePort(defaultSP, nil)
if err != nil {
Expand Down Expand Up @@ -517,6 +520,17 @@ func TestRolloutUpdateCustomHCDescription(t *testing.T) {
if !reflect.DeepEqual(outputBCHC, outputBCHCWithFlag) {
t.Fatalf("Compute healthcheck is:\n%s, want:\n%s", pretty.Sprint(outputBCHC), pretty.Sprint(outputBCHCWithFlag))
}

fakeRecorder := fakeSingletonRecorderGetter.FakeRecorder()
select {
case output := <-fakeRecorder.Events:
if !strings.HasPrefix(output, "Normal HealthcheckDescriptionUpdate") {
t.Fatalf("Incorrect event emitted on healthcheck update: %s.", output)
}
case <-time.After(10 * time.Second):
t.Fatalf("Timeout when expecting Event.")
}

}

func TestVersion(t *testing.T) {
Expand Down Expand Up @@ -1342,7 +1356,7 @@ func TestSyncServicePort(t *testing.T) {
tc.setup(mock)
}

hcs := NewHealthChecker(fakeGCE, "/", defaultBackendSvc)
hcs := NewHealthChecker(fakeGCE, "/", defaultBackendSvc, NewFakeRecorderGetter(0), NewFakeServiceGetter())

gotSelfLink, err := hcs.SyncServicePort(tc.sp, tc.probe)
if gotErr := err != nil; gotErr != tc.wantErr {
Expand Down
10 changes: 10 additions & 0 deletions pkg/healthchecks/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
computebeta "google.golang.org/api/compute/v0.beta"
"google.golang.org/api/compute/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/ingress-gce/pkg/translator"
"k8s.io/ingress-gce/pkg/utils"
)
Expand Down Expand Up @@ -56,3 +57,12 @@ type HealthChecker interface {
Delete(name string, scope meta.KeyType) error
Get(name string, version meta.Version, scope meta.KeyType) (*translator.HealthCheck, error)
}

// ServiceGetter is an interface to retrieve Kubernetes Services.
type ServiceGetter interface {
GetService(namespace, name string) (*v1.Service, error)
}

type RecorderGetter interface {
Recorder(ns string) record.EventRecorder
}
2 changes: 2 additions & 0 deletions pkg/translator/healthchecks.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ type HealthCheck struct {
// compute struct back.
computealpha.HTTPHealthCheck
computealpha.HealthCheck

Service *v1.Service
}

// NewHealthCheck creates a HealthCheck which abstracts nested structs away
Expand Down