From ca872dd72a57107ca248661b8cf474e15a89bb28 Mon Sep 17 00:00:00 2001 From: Vijay Samuel Date: Wed, 15 Jul 2020 23:16:31 -0700 Subject: [PATCH 1/2] Remove unnecessary restarts of metricsets while using Node autodiscover --- CHANGELOG.next.asciidoc | 1 + .../autodiscover/providers/kubernetes/node.go | 36 ++++ .../providers/kubernetes/node_test.go | 175 ++++++++++++++++++ libbeat/common/kubernetes/watcher.go | 5 +- 4 files changed, 216 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index ac1904a069a4..b1614389aa92 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -148,6 +148,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix metrics hints builder to avoid wrong container metadata usage when port is not exposed {pull}18979[18979] - Server-side TLS config now validates certificate and key are both specified {pull}19584[19584] - Fix seccomp policy for calls to `chmod` and `chown`. {pull}20054[20054] +- Remove unnecessary restarts of metricsets while using Node autodiscover {pull}19974[19974] *Auditbeat* diff --git a/libbeat/autodiscover/providers/kubernetes/node.go b/libbeat/autodiscover/providers/kubernetes/node.go index bd529582f0cd..c92ee80753df 100644 --- a/libbeat/autodiscover/providers/kubernetes/node.go +++ b/libbeat/autodiscover/providers/kubernetes/node.go @@ -23,6 +23,7 @@ import ( "github.com/gofrs/uuid" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" k8s "k8s.io/client-go/kubernetes" "github.com/elastic/beats/v7/libbeat/autodiscover/builder" @@ -66,6 +67,7 @@ func NewNodeEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pu watcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, kubernetes.WatchOptions{ SyncTimeout: config.SyncPeriod, Node: config.Node, + IsUpdated: isUpdated, }, nil) if err != nil { @@ -190,6 +192,40 @@ func (n *node) emit(node *kubernetes.Node, flag string) { n.publish(event) } +func isUpdated(o, n interface{}) bool { + old, _ := o.(*kubernetes.Node) + new, _ := n.(*kubernetes.Node) + + // Consider as not update in case one of the two objects is not a Node + if old == nil || new == nil { + return true + } + + // This is a resync. It is not an update + if old.ResourceVersion == new.ResourceVersion { + return false + } + + // If the old object and new object are different + oldCopy := old.DeepCopy() + oldCopy.ResourceVersion = "" + + newCopy := new.DeepCopy() + newCopy.ResourceVersion = "" + + // If the old object and new object are different in either meta or spec then there is a valid change + fmt.Println(equality.Semantic.DeepDerivative(oldCopy.Spec, newCopy.Spec), equality.Semantic.DeepDerivative(oldCopy.ObjectMeta, newCopy.ObjectMeta)) + if !equality.Semantic.DeepEqual(oldCopy.Spec, newCopy.Spec) || !equality.Semantic.DeepEqual(oldCopy.ObjectMeta, newCopy.ObjectMeta) { + return true + } + + // If there is a change in the node status then there is a valid change. + if isNodeReady(old) != isNodeReady(new) { + return true + } + return false +} + func getAddress(node *kubernetes.Node) string { for _, address := range node.Status.Addresses { if address.Type == v1.NodeExternalIP && address.Address != "" { diff --git a/libbeat/autodiscover/providers/kubernetes/node_test.go b/libbeat/autodiscover/providers/kubernetes/node_test.go index 59fb67ada7da..736bd153cf20 100644 --- a/libbeat/autodiscover/providers/kubernetes/node_test.go +++ b/libbeat/autodiscover/providers/kubernetes/node_test.go @@ -278,3 +278,178 @@ func TestEmitEvent_Node(t *testing.T) { }) } } + +func TestNode_isUpdated(t *testing.T) { + tests := []struct { + old *kubernetes.Node + new *kubernetes.Node + updated bool + test string + }{ + { + test: "one of the objects is nil then its updated", + old: nil, + new: &kubernetes.Node{}, + updated: true, + }, + { + test: "both empty nodes should return not updated", + old: &kubernetes.Node{}, + new: &kubernetes.Node{}, + updated: false, + }, + { + test: "resource version is the same should return not updated", + old: &kubernetes.Node{ + ObjectMeta: kubernetes.ObjectMeta{ + ResourceVersion: "1", + }, + }, + new: &kubernetes.Node{ + ObjectMeta: kubernetes.ObjectMeta{ + ResourceVersion: "1", + }, + }, + }, + { + test: "if meta changes then it should return updated", + old: &kubernetes.Node{ + ObjectMeta: kubernetes.ObjectMeta{ + ResourceVersion: "1", + Annotations: map[string]string{}, + }, + }, + new: &kubernetes.Node{ + ObjectMeta: kubernetes.ObjectMeta{ + ResourceVersion: "2", + Annotations: map[string]string{ + "a": "b", + }, + }, + }, + updated: true, + }, + { + test: "if spec changes then it should return updated", + old: &kubernetes.Node{ + ObjectMeta: kubernetes.ObjectMeta{ + ResourceVersion: "1", + Annotations: map[string]string{ + "a": "b", + }, + }, + Spec: v1.NodeSpec{ + ProviderID: "1", + Unschedulable: false, + }, + }, + new: &kubernetes.Node{ + ObjectMeta: kubernetes.ObjectMeta{ + ResourceVersion: "2", + Annotations: map[string]string{ + "a": "b", + }, + }, + Spec: v1.NodeSpec{ + ProviderID: "1", + Unschedulable: true, + }, + }, + updated: true, + }, + { + test: "if overall status doesn't change then its not an update", + old: &kubernetes.Node{ + ObjectMeta: kubernetes.ObjectMeta{ + ResourceVersion: "1", + Annotations: map[string]string{ + "a": "b", + }, + }, + Spec: v1.NodeSpec{ + ProviderID: "1", + Unschedulable: true, + }, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionTrue, + }, + }, + }, + }, + new: &kubernetes.Node{ + ObjectMeta: kubernetes.ObjectMeta{ + ResourceVersion: "2", + Annotations: map[string]string{ + "a": "b", + }, + }, + Spec: v1.NodeSpec{ + ProviderID: "1", + Unschedulable: true, + }, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionTrue, + }, + }, + }, + }, + updated: false, + }, + { + test: "if node status changes then its an update", + old: &kubernetes.Node{ + ObjectMeta: kubernetes.ObjectMeta{ + ResourceVersion: "1", + Annotations: map[string]string{ + "a": "b", + }, + }, + Spec: v1.NodeSpec{ + ProviderID: "1", + Unschedulable: true, + }, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionFalse, + }, + }, + }, + }, + new: &kubernetes.Node{ + ObjectMeta: kubernetes.ObjectMeta{ + ResourceVersion: "2", + Annotations: map[string]string{ + "a": "b", + }, + }, + Spec: v1.NodeSpec{ + ProviderID: "1", + Unschedulable: true, + }, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionTrue, + }, + }, + }, + }, + updated: true, + }, + } + + for _, test := range tests { + t.Run(test.test, func(t *testing.T) { + assert.Equal(t, test.updated, isUpdated(test.old, test.new)) + }) + } +} diff --git a/libbeat/common/kubernetes/watcher.go b/libbeat/common/kubernetes/watcher.go index 606a36ac1099..cf810c4a5171 100644 --- a/libbeat/common/kubernetes/watcher.go +++ b/libbeat/common/kubernetes/watcher.go @@ -66,6 +66,8 @@ type WatchOptions struct { Node string // Namespace is used for filtering watched resource to given namespace, use "" for all namespaces Namespace string + + IsUpdated func(old, new interface{}) bool } type item struct { @@ -123,7 +125,8 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption new, _ := accessor.ResourceVersion(n.(runtime.Object)) // Only enqueue changes that have a different resource versions to avoid processing resyncs. - if old != new { + // Also if there is a registered function to check for updates, use it to check for updates. + if old != new && opts.IsUpdated != nil && opts.IsUpdated(o, n) { w.enqueue(n, update) } }, From ba464e1e47c219637cf7cf2405c298eeb6ac7718 Mon Sep 17 00:00:00 2001 From: Vijay Samuel Date: Mon, 20 Jul 2020 11:12:25 -0700 Subject: [PATCH 2/2] Remove print statement and incorporate review comments --- .../autodiscover/providers/kubernetes/node.go | 1 - libbeat/common/kubernetes/watcher.go | 23 +++++++++++++------ 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/libbeat/autodiscover/providers/kubernetes/node.go b/libbeat/autodiscover/providers/kubernetes/node.go index c92ee80753df..a78622756cd6 100644 --- a/libbeat/autodiscover/providers/kubernetes/node.go +++ b/libbeat/autodiscover/providers/kubernetes/node.go @@ -214,7 +214,6 @@ func isUpdated(o, n interface{}) bool { newCopy.ResourceVersion = "" // If the old object and new object are different in either meta or spec then there is a valid change - fmt.Println(equality.Semantic.DeepDerivative(oldCopy.Spec, newCopy.Spec), equality.Semantic.DeepDerivative(oldCopy.ObjectMeta, newCopy.ObjectMeta)) if !equality.Semantic.DeepEqual(oldCopy.Spec, newCopy.Spec) || !equality.Semantic.DeepEqual(oldCopy.ObjectMeta, newCopy.ObjectMeta) { return true } diff --git a/libbeat/common/kubernetes/watcher.go b/libbeat/common/kubernetes/watcher.go index cf810c4a5171..3cef13944ecc 100644 --- a/libbeat/common/kubernetes/watcher.go +++ b/libbeat/common/kubernetes/watcher.go @@ -66,7 +66,8 @@ type WatchOptions struct { Node string // Namespace is used for filtering watched resource to given namespace, use "" for all namespaces Namespace string - + // IsUpdated allows registering a func that allows the invoker of the Watch to decide what amounts to an update + // vs what does not. IsUpdated func(old, new interface{}) bool } @@ -102,6 +103,19 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption queue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), objType) ctx, cancel := context.WithCancel(context.Background()) + if opts.IsUpdated == nil { + opts.IsUpdated = func(o, n interface{}) bool { + old, _ := accessor.ResourceVersion(o.(runtime.Object)) + new, _ := accessor.ResourceVersion(n.(runtime.Object)) + + // Only enqueue changes that have a different resource versions to avoid processing resyncs. + if old != new { + return true + } + return false + } + } + w := &watcher{ client: client, informer: informer, @@ -121,12 +135,7 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption w.enqueue(o, delete) }, UpdateFunc: func(o, n interface{}) { - old, _ := accessor.ResourceVersion(o.(runtime.Object)) - new, _ := accessor.ResourceVersion(n.(runtime.Object)) - - // Only enqueue changes that have a different resource versions to avoid processing resyncs. - // Also if there is a registered function to check for updates, use it to check for updates. - if old != new && opts.IsUpdated != nil && opts.IsUpdated(o, n) { + if opts.IsUpdated(o, n) { w.enqueue(n, update) } },