Skip to content

Commit

Permalink
Remove unnecessary restarts of metricsets while using Node autodiscov…
Browse files Browse the repository at this point in the history
  • Loading branch information
vjsamuel authored and melchiormoulin committed Oct 14, 2020
1 parent 883c0ea commit 03e591e
Show file tree
Hide file tree
Showing 4 changed files with 228 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Server-side TLS config now validates certificate and key are both specified {pull}19584[19584]
- Fix terminating pod autodiscover issue. {pull}20084[20084]
- Fix seccomp policy for calls to `chmod` and `chown`. {pull}20054[20054]
- Remove unnecessary restarts of metricsets while using Node autodiscover {pull}19974[19974]
- Output errors when Kibana index pattern setup fails. {pull}20121[20121]

*Auditbeat*
Expand Down
35 changes: 35 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/gofrs/uuid"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
k8s "k8s.io/client-go/kubernetes"

"github.com/elastic/beats/v7/libbeat/autodiscover/builder"
Expand Down Expand Up @@ -66,6 +67,7 @@ func NewNodeEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pu
watcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Node: config.Node,
IsUpdated: isUpdated,
}, nil)

if err != nil {
Expand Down Expand Up @@ -190,6 +192,39 @@ func (n *node) emit(node *kubernetes.Node, flag string) {
n.publish(event)
}

func isUpdated(o, n interface{}) bool {
old, _ := o.(*kubernetes.Node)
new, _ := n.(*kubernetes.Node)

// Consider as not update in case one of the two objects is not a Node
if old == nil || new == nil {
return true
}

// This is a resync. It is not an update
if old.ResourceVersion == new.ResourceVersion {
return false
}

// If the old object and new object are different
oldCopy := old.DeepCopy()
oldCopy.ResourceVersion = ""

newCopy := new.DeepCopy()
newCopy.ResourceVersion = ""

// If the old object and new object are different in either meta or spec then there is a valid change
if !equality.Semantic.DeepEqual(oldCopy.Spec, newCopy.Spec) || !equality.Semantic.DeepEqual(oldCopy.ObjectMeta, newCopy.ObjectMeta) {
return true
}

// If there is a change in the node status then there is a valid change.
if isNodeReady(old) != isNodeReady(new) {
return true
}
return false
}

func getAddress(node *kubernetes.Node) string {
for _, address := range node.Status.Addresses {
if address.Type == v1.NodeExternalIP && address.Address != "" {
Expand Down
175 changes: 175 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,3 +278,178 @@ func TestEmitEvent_Node(t *testing.T) {
})
}
}

func TestNode_isUpdated(t *testing.T) {
tests := []struct {
old *kubernetes.Node
new *kubernetes.Node
updated bool
test string
}{
{
test: "one of the objects is nil then its updated",
old: nil,
new: &kubernetes.Node{},
updated: true,
},
{
test: "both empty nodes should return not updated",
old: &kubernetes.Node{},
new: &kubernetes.Node{},
updated: false,
},
{
test: "resource version is the same should return not updated",
old: &kubernetes.Node{
ObjectMeta: kubernetes.ObjectMeta{
ResourceVersion: "1",
},
},
new: &kubernetes.Node{
ObjectMeta: kubernetes.ObjectMeta{
ResourceVersion: "1",
},
},
},
{
test: "if meta changes then it should return updated",
old: &kubernetes.Node{
ObjectMeta: kubernetes.ObjectMeta{
ResourceVersion: "1",
Annotations: map[string]string{},
},
},
new: &kubernetes.Node{
ObjectMeta: kubernetes.ObjectMeta{
ResourceVersion: "2",
Annotations: map[string]string{
"a": "b",
},
},
},
updated: true,
},
{
test: "if spec changes then it should return updated",
old: &kubernetes.Node{
ObjectMeta: kubernetes.ObjectMeta{
ResourceVersion: "1",
Annotations: map[string]string{
"a": "b",
},
},
Spec: v1.NodeSpec{
ProviderID: "1",
Unschedulable: false,
},
},
new: &kubernetes.Node{
ObjectMeta: kubernetes.ObjectMeta{
ResourceVersion: "2",
Annotations: map[string]string{
"a": "b",
},
},
Spec: v1.NodeSpec{
ProviderID: "1",
Unschedulable: true,
},
},
updated: true,
},
{
test: "if overall status doesn't change then its not an update",
old: &kubernetes.Node{
ObjectMeta: kubernetes.ObjectMeta{
ResourceVersion: "1",
Annotations: map[string]string{
"a": "b",
},
},
Spec: v1.NodeSpec{
ProviderID: "1",
Unschedulable: true,
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
},
},
},
},
new: &kubernetes.Node{
ObjectMeta: kubernetes.ObjectMeta{
ResourceVersion: "2",
Annotations: map[string]string{
"a": "b",
},
},
Spec: v1.NodeSpec{
ProviderID: "1",
Unschedulable: true,
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
},
},
},
},
updated: false,
},
{
test: "if node status changes then its an update",
old: &kubernetes.Node{
ObjectMeta: kubernetes.ObjectMeta{
ResourceVersion: "1",
Annotations: map[string]string{
"a": "b",
},
},
Spec: v1.NodeSpec{
ProviderID: "1",
Unschedulable: true,
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionFalse,
},
},
},
},
new: &kubernetes.Node{
ObjectMeta: kubernetes.ObjectMeta{
ResourceVersion: "2",
Annotations: map[string]string{
"a": "b",
},
},
Spec: v1.NodeSpec{
ProviderID: "1",
Unschedulable: true,
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
},
},
},
},
updated: true,
},
}

for _, test := range tests {
t.Run(test.test, func(t *testing.T) {
assert.Equal(t, test.updated, isUpdated(test.old, test.new))
})
}
}
22 changes: 17 additions & 5 deletions libbeat/common/kubernetes/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ type WatchOptions struct {
Node string
// Namespace is used for filtering watched resource to given namespace, use "" for all namespaces
Namespace string
// IsUpdated allows registering a func that allows the invoker of the Watch to decide what amounts to an update
// vs what does not.
IsUpdated func(old, new interface{}) bool
}

type item struct {
Expand Down Expand Up @@ -100,6 +103,19 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption
queue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), objType)
ctx, cancel := context.WithCancel(context.Background())

if opts.IsUpdated == nil {
opts.IsUpdated = func(o, n interface{}) bool {
old, _ := accessor.ResourceVersion(o.(runtime.Object))
new, _ := accessor.ResourceVersion(n.(runtime.Object))

// Only enqueue changes that have a different resource versions to avoid processing resyncs.
if old != new {
return true
}
return false
}
}

w := &watcher{
client: client,
informer: informer,
Expand All @@ -119,11 +135,7 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption
w.enqueue(o, delete)
},
UpdateFunc: func(o, n interface{}) {
old, _ := accessor.ResourceVersion(o.(runtime.Object))
new, _ := accessor.ResourceVersion(n.(runtime.Object))

// Only enqueue changes that have a different resource versions to avoid processing resyncs.
if old != new {
if opts.IsUpdated(o, n) {
w.enqueue(n, update)
}
},
Expand Down

0 comments on commit 03e591e

Please sign in to comment.