Skip to content

Commit

Permalink
Add events for major lifecycle events
Browse files Browse the repository at this point in the history
{Add,Remove}Nodes from InstanceGroup
SyncIngress, TranslateIngress, IPChanged, GarbageCollection
  • Loading branch information
bowei committed Jul 14, 2020
1 parent 74680cd commit 26b2c18
Show file tree
Hide file tree
Showing 10 changed files with 167 additions and 39 deletions.
30 changes: 16 additions & 14 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"k8s.io/ingress-gce/pkg/common/operator"
"k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/controller/translator"
"k8s.io/ingress-gce/pkg/events"
"k8s.io/ingress-gce/pkg/flags"
"k8s.io/ingress-gce/pkg/frontendconfig"
"k8s.io/ingress-gce/pkg/healthchecks"
Expand Down Expand Up @@ -108,7 +109,7 @@ func NewLoadBalancerController(
})

healthChecker := healthchecks.NewHealthChecker(ctx.Cloud, ctx.HealthCheckPath, ctx.DefaultBackendSvcPort.ID.Service)
instancePool := instances.NewNodePool(ctx.Cloud, ctx.ClusterNamer)
instancePool := instances.NewNodePool(ctx.Cloud, ctx.ClusterNamer, ctx)
backendPool := backends.NewPool(ctx.Cloud, ctx.ClusterNamer)

lbc := LoadBalancerController{
Expand Down Expand Up @@ -139,8 +140,8 @@ func NewLoadBalancerController(
return
}

klog.V(3).Infof("Ingress %v added, enqueuing", common.NamespacedName(addIng))
lbc.ctx.Recorder(addIng.Namespace).Eventf(addIng, apiv1.EventTypeNormal, "ADD", common.NamespacedName(addIng))
klog.V(2).Infof("Ingress %v added, enqueuing", common.NamespacedName(addIng))
lbc.ctx.Recorder(addIng.Namespace).Eventf(addIng, apiv1.EventTypeNormal, events.SyncIngress, "Scheduled for sync")
lbc.ingQueue.Enqueue(obj)
},
DeleteFunc: func(obj interface{}) {
Expand Down Expand Up @@ -176,11 +177,11 @@ func NewLoadBalancerController(
return
}
if reflect.DeepEqual(old, cur) {
klog.V(3).Infof("Periodic enqueueing of %v", common.NamespacedName(curIng))
klog.V(2).Infof("Periodic enqueueing of %v", common.NamespacedName(curIng))
} else {
klog.V(3).Infof("Ingress %v changed, enqueuing", common.NamespacedName(curIng))
klog.V(2).Infof("Ingress %v changed, enqueuing", common.NamespacedName(curIng))
}

lbc.ctx.Recorder(curIng.Namespace).Eventf(curIng, apiv1.EventTypeNormal, events.SyncIngress, "Scheduled for sync")
lbc.ingQueue.Enqueue(cur)
},
})
Expand Down Expand Up @@ -558,7 +559,8 @@ func (lbc *LoadBalancerController) sync(key string) error {
err := lbc.ingSyncer.GC(allIngresses, ing, frontendGCAlgorithm)
// Skip emitting an event if ingress does not exist as we cannot retrieve ingress namespace.
if err != nil && ingExists {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "GC", fmt.Sprintf("Error during GC: %v", err))
klog.Errorf("Error in GC for %s/%s: %v", ing.Namespace, ing.Name, err)
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, events.GarbageCollection, "Error: %v", err)
}
// Delete the ingress state for metrics after GC is successful.
if err == nil && ingExists {
Expand All @@ -578,16 +580,16 @@ func (lbc *LoadBalancerController) sync(key string) error {
urlMap, errs := lbc.Translator.TranslateIngress(ing, lbc.ctx.DefaultBackendSvcPort.ID, lbc.ctx.ClusterNamer)

if errs != nil {
msg := fmt.Errorf("error while evaluating the ingress spec: %v", utils.JoinErrs(errs))
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Translate", msg.Error())
msg := fmt.Errorf("invalid ingress spec: %v", utils.JoinErrs(errs))
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, events.TranslateIngress, "Translation failed: %v", msg)
return msg
}

// Sync GCP resources.
syncState := &syncState{urlMap, ing, nil}
syncErr := lbc.ingSyncer.Sync(syncState)
if syncErr != nil {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Sync", fmt.Sprintf("Error during sync: %v", syncErr.Error()))
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, events.SyncIngress, "Error syncing to GCP: %v", syncErr.Error())
} else {
// Insert/update the ingress state for metrics after successful sync.
lbc.metrics.SetIngress(key, metrics.NewIngressState(ing, urlMap.AllServicePorts()))
Expand All @@ -598,7 +600,7 @@ func (lbc *LoadBalancerController) sync(key string) error {
// free up enough quota for the next sync to pass.
frontendGCAlgorithm := frontendGCAlgorithm(ingExists, ing)
if gcErr := lbc.ingSyncer.GC(allIngresses, ing, frontendGCAlgorithm); gcErr != nil {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "GC", fmt.Sprintf("Error during GC: %v", gcErr))
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, events.GarbageCollection, "Error during garbage collection: %v", gcErr)
return fmt.Errorf("error during sync %v, error during GC %v", syncErr, gcErr)
}

Expand Down Expand Up @@ -633,7 +635,7 @@ func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing
klog.Errorf("PatchIngressStatus(%s/%s) failed: %v", currIng.Namespace, currIng.Name, err)
return err
}
lbc.ctx.Recorder(ing.Namespace).Eventf(currIng, apiv1.EventTypeNormal, "CREATE", "ip: %v", ip)
lbc.ctx.Recorder(ing.Namespace).Eventf(currIng, apiv1.EventTypeNormal, events.IPChanged, "IP is now %v", ip)
}
}
annotations, err := loadbalancers.GetLBAnnotations(l7, currIng.Annotations, lbc.backendSyncer)
Expand All @@ -655,7 +657,7 @@ func (lbc *LoadBalancerController) toRuntimeInfo(ing *v1beta1.Ingress, urlMap *u
if apierrors.IsNotFound(err) {
// TODO: this path should be removed when external certificate managers migrate to a better solution.
const msg = "Could not find TLS certificates. Continuing setup for the load balancer to serve HTTP. Note: this behavior is deprecated and will be removed in a future version of ingress-gce"
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Sync", msg)
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, events.SyncIngress, msg)
} else {
klog.Errorf("Could not get certificates for ingress %s/%s: %v", ing.Namespace, ing.Name, err)
return nil, err
Expand All @@ -666,7 +668,7 @@ func (lbc *LoadBalancerController) toRuntimeInfo(ing *v1beta1.Ingress, urlMap *u
if lbc.ctx.FrontendConfigEnabled {
feConfig, err = frontendconfig.FrontendConfigForIngress(lbc.ctx.FrontendConfigs().List(), ing)
if err != nil {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Sync", fmt.Sprintf("%v", err))
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, events.SyncIngress, "Error: %v", err)
}
// Object in cache could be changed in-flight. Deepcopy to
// reduce race conditions.
Expand Down
49 changes: 49 additions & 0 deletions pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,22 @@ limitations under the License.
package events

import (
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
)

const (
AddNodes = "IngressGCE_AddNodes"
RemoveNodes = "IngressGCE_RemoveNodes"

SyncIngress = "Sync"
TranslateIngress = "Translate"
IPChanged = "IPChanged"
GarbageCollection = "GarbageCollection"

SyncService = "Sync"
)

type RecorderProducer interface {
Recorder(ns string) record.EventRecorder
}
Expand All @@ -30,3 +43,39 @@ type RecorderProducerMock struct {
func (r RecorderProducerMock) Recorder(ns string) record.EventRecorder {
return &record.FakeRecorder{}
}

// GloablEventf records a Cluster level event not attached to a given object.
func GlobalEventf(r record.EventRecorder, eventtype, reason, messageFmt string, args ...interface{}) {
// Using an empty ObjectReference to indicate no associated
// resource. This apparently works, see the package
// k8s.io/client-go/tools/record.
r.Eventf(&v1.ObjectReference{}, eventtype, reason, messageFmt, args...)
}

// truncatedStringListMax is a variable to make testing easier. This
// value should not be modified.
var truncatedStringListMax = 2000

// TruncateStringList will render the list of items as a string,
// eliding elements with elipsis at the end if there are more than a
// reasonable number of characters in the resulting string. This is
// used to prevent accidentally dumping enormous strings into the
// Event description.
func TruncatedStringList(items []string) string {
var (
ret = "["
first = true
)
for _, s := range items {
if len(ret) + len(s) + 1 > truncatedStringListMax {
ret += ", ..."
break
}
if !first {
ret += ", "
}
first = false
ret += s
}
return ret + "]"
}
34 changes: 34 additions & 0 deletions pkg/events/events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package events

import (
"fmt"
"testing"
)

func TestTruncatedStringList(t *testing.T) {
var saved int
truncatedStringListMax, saved = 30, truncatedStringListMax
defer func() { truncatedStringListMax = saved }()

for _, tc := range []struct {
desc string
count int
want string
}{
{"zero", 0, "[]"},
{"one", 1, "[elt-0]"},
{"not truncated", 4, "[elt-0, elt-1, elt-2, elt-3]"},
{"truncated", 20, "[elt-0, elt-1, elt-2, elt-3, ...]"},
} {
t.Run(tc.desc, func(t *testing.T) {
var l []string
for i := 0; i < tc.count; i++ {
l = append(l, fmt.Sprintf("elt-%d", i))
}
got := TruncatedStringList(l)
if got != tc.want {
t.Errorf("TruncatedString(%v) = %q; want %q", l, got, tc.want)
}
})
}
}
35 changes: 26 additions & 9 deletions pkg/instances/instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ import (
"net/http"
"time"

"google.golang.org/api/compute/v1"
"k8s.io/client-go/tools/record"
"k8s.io/ingress-gce/pkg/events"
"k8s.io/ingress-gce/pkg/utils/namer"
"k8s.io/klog"

"google.golang.org/api/compute/v1"
core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"

"k8s.io/ingress-gce/pkg/utils"
Expand All @@ -39,16 +42,22 @@ const (
type Instances struct {
cloud InstanceGroups
ZoneLister
namer namer.BackendNamer
namer namer.BackendNamer
recorder record.EventRecorder
}

type recorderSource interface {
Recorder(ns string) record.EventRecorder
}

// NewNodePool creates a new node pool.
// - cloud: implements InstanceGroups, used to sync Kubernetes nodes with
// members of the cloud InstanceGroup.
func NewNodePool(cloud InstanceGroups, namer namer.BackendNamer) NodePool {
func NewNodePool(cloud InstanceGroups, namer namer.BackendNamer, recorders recorderSource) NodePool {
return &Instances{
cloud: cloud,
namer: namer,
cloud: cloud,
namer: namer,
recorder: recorders.Recorder(""), // No namespace
}
}

Expand Down Expand Up @@ -250,7 +259,8 @@ func (i *Instances) splitNodesByZone(names []string) map[string][]string {

// Add adds the given instances to the appropriately zoned Instance Group.
func (i *Instances) Add(groupName string, names []string) error {
errs := []error{}
events.GlobalEventf(i.recorder, core.EventTypeNormal, events.AddNodes, "Adding %s to InstanceGroup %q", events.TruncatedStringList(names), groupName)
var errs []error
for zone, nodeNames := range i.splitNodesByZone(names) {
klog.V(1).Infof("Adding nodes %v to %v in zone %v", nodeNames, groupName, zone)
if err := i.cloud.AddInstancesToInstanceGroup(groupName, zone, i.cloud.ToInstanceReferences(zone, nodeNames)); err != nil {
Expand All @@ -260,12 +270,16 @@ func (i *Instances) Add(groupName string, names []string) error {
if len(errs) == 0 {
return nil
}
return fmt.Errorf("%v", errs)

err := fmt.Errorf("AddInstances: %v", errs)
events.GlobalEventf(i.recorder, core.EventTypeWarning, events.AddNodes, "Error adding %s to InstanceGroup %q: %v", events.TruncatedStringList(names), groupName, err)
return err
}

// Remove removes the given instances from the appropriately zoned Instance Group.
func (i *Instances) Remove(groupName string, names []string) error {
errs := []error{}
events.GlobalEventf(i.recorder, core.EventTypeNormal, events.RemoveNodes, "Removing %s from InstanceGroup %q", events.TruncatedStringList(names), groupName)
var errs []error
for zone, nodeNames := range i.splitNodesByZone(names) {
klog.V(1).Infof("Removing nodes %v from %v in zone %v", nodeNames, groupName, zone)
if err := i.cloud.RemoveInstancesFromInstanceGroup(groupName, zone, i.cloud.ToInstanceReferences(zone, nodeNames)); err != nil {
Expand All @@ -275,7 +289,10 @@ func (i *Instances) Remove(groupName string, names []string) error {
if len(errs) == 0 {
return nil
}
return fmt.Errorf("%v", errs)

err := fmt.Errorf("RemoveInstances: %v", errs)
events.GlobalEventf(i.recorder, core.EventTypeWarning, events.RemoveNodes, "Error removing nodes %s from InstanceGroup %q: %v", events.TruncatedStringList(names), groupName, err)
return err
}

// Sync nodes with the instances in the instance group.
Expand Down
3 changes: 2 additions & 1 deletion pkg/instances/instances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/ingress-gce/pkg/test"
"k8s.io/ingress-gce/pkg/utils/namer"
)

Expand All @@ -28,7 +29,7 @@ const defaultZone = "default-zone"
var defaultNamer = namer.NewNamer("uid1", "fw1")

func newNodePool(f *FakeInstanceGroups, zone string) NodePool {
pool := NewNodePool(f, defaultNamer)
pool := NewNodePool(f, defaultNamer, &test.FakeRecorderSource{})
pool.Init(&FakeZoneLister{[]string{zone}})
return pool
}
Expand Down
10 changes: 9 additions & 1 deletion pkg/loadbalancers/forwarding_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/composite"
"k8s.io/ingress-gce/pkg/events"
"k8s.io/ingress-gce/pkg/flags"
"k8s.io/ingress-gce/pkg/translator"
"k8s.io/ingress-gce/pkg/utils"
Expand Down Expand Up @@ -92,6 +94,7 @@ func (l *L7) checkForwardingRule(protocol namer.NamerProtocol, name, proxyLink,
return nil, err
}
existing = nil
l.recorder.Eventf(l.runtimeInfo.Ingress, corev1.EventTypeNormal, events.SyncIngress, "ForwardingRule %q deleted", key.Name)
}
if existing == nil {
// This is a special case where exactly one of http or https forwarding rule
Expand All @@ -115,6 +118,8 @@ func (l *L7) checkForwardingRule(protocol namer.NamerProtocol, name, proxyLink,
if err = composite.CreateForwardingRule(l.cloud, key, fr); err != nil {
return nil, err
}
l.recorder.Eventf(l.runtimeInfo.Ingress, corev1.EventTypeNormal, events.SyncIngress, "ForwardingRule %q created", key.Name)

key, err = l.CreateKey(name)
if err != nil {
return nil, err
Expand Down Expand Up @@ -293,6 +298,7 @@ func (l *L4) ensureForwardingRule(loadBalancerName, bsLink string, options gce.I
if err = utils.IgnoreHTTPNotFound(composite.DeleteForwardingRule(l.cloud, key, existingVersion)); err != nil {
return nil, err
}
l.recorder.Eventf(l.Service, corev1.EventTypeNormal, events.SyncIngress, "ForwardingRule %q deleted", key.Name)
}
}
klog.V(2).Infof("ensureForwardingRule: Recreating forwarding rule - %s", fr.Name)
Expand All @@ -302,9 +308,11 @@ func (l *L4) ensureForwardingRule(loadBalancerName, bsLink string, options gce.I
if addrMgr != nil {
// Now that the controller knows the forwarding rule exists, we can release the address.
if err := addrMgr.ReleaseAddress(); err != nil {
klog.Errorf("ensureInternalLoadBalancer: failed to release address reservation, possibly causing an orphan: %v", err)
klog.Errorf("ensureInternalLoadBalancer: - %s, failed to release address reservation, possibly causing an orphan: %v", fr.Name, err)
}
}
l.recorder.Eventf(l.Service, corev1.EventTypeNormal, events.SyncIngress, "ForwardingRule %q created", key.Name)

return composite.GetForwardingRule(l.cloud, key, fr.Version)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/loadbalancers/loadbalancers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"k8s.io/ingress-gce/pkg/flags"
"k8s.io/ingress-gce/pkg/instances"
"k8s.io/ingress-gce/pkg/loadbalancers/features"
"k8s.io/ingress-gce/pkg/test"
"k8s.io/ingress-gce/pkg/translator"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/common"
Expand Down Expand Up @@ -142,7 +143,7 @@ func newIngress() *v1beta1.Ingress {
}

func newFakeLoadBalancerPool(cloud *gce.Cloud, t *testing.T, namer *namer_util.Namer) L7s {
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString(), namer)
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString(), namer, test.FakeRecorderSource{})
nodePool := instances.NewNodePool(fakeIGs, namer)
nodePool.Init(&instances.FakeZoneLister{Zones: []string{defaultZone}})

Expand Down
Loading

0 comments on commit 26b2c18

Please sign in to comment.