Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unnecessary restarts of metricsets while using Node autodiscover #19974

Merged
merged 3 commits into from
Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Server-side TLS config now validates certificate and key are both specified {pull}19584[19584]
- Fix terminating pod autodiscover issue. {pull}20084[20084]
- Fix seccomp policy for calls to `chmod` and `chown`. {pull}20054[20054]
- Remove unnecessary restarts of metricsets while using Node autodiscover {pull}19974[19974]
- Output errors when Kibana index pattern setup fails. {pull}20121[20121]

*Auditbeat*
Expand Down
35 changes: 35 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/gofrs/uuid"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
k8s "k8s.io/client-go/kubernetes"

"github.com/elastic/beats/v7/libbeat/autodiscover/builder"
Expand Down Expand Up @@ -66,6 +67,7 @@ func NewNodeEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pu
watcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Node: config.Node,
IsUpdated: isUpdated,
}, nil)

if err != nil {
Expand Down Expand Up @@ -190,6 +192,39 @@ func (n *node) emit(node *kubernetes.Node, flag string) {
n.publish(event)
}

func isUpdated(o, n interface{}) bool {
old, _ := o.(*kubernetes.Node)
new, _ := n.(*kubernetes.Node)

// Consider as not update in case one of the two objects is not a Node
if old == nil || new == nil {
return true
}

// This is a resync. It is not an update
if old.ResourceVersion == new.ResourceVersion {
return false
}

// If the old object and new object are different
oldCopy := old.DeepCopy()
oldCopy.ResourceVersion = ""

newCopy := new.DeepCopy()
newCopy.ResourceVersion = ""

// If the old object and new object are different in either meta or spec then there is a valid change
if !equality.Semantic.DeepEqual(oldCopy.Spec, newCopy.Spec) || !equality.Semantic.DeepEqual(oldCopy.ObjectMeta, newCopy.ObjectMeta) {
return true
}

// If there is a change in the node status then there is a valid change.
if isNodeReady(old) != isNodeReady(new) {
return true
}
return false
}

func getAddress(node *kubernetes.Node) string {
for _, address := range node.Status.Addresses {
if address.Type == v1.NodeExternalIP && address.Address != "" {
Expand Down
175 changes: 175 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,3 +278,178 @@ func TestEmitEvent_Node(t *testing.T) {
})
}
}

func TestNode_isUpdated(t *testing.T) {
tests := []struct {
old *kubernetes.Node
new *kubernetes.Node
updated bool
test string
}{
{
test: "one of the objects is nil then its updated",
old: nil,
new: &kubernetes.Node{},
updated: true,
},
{
test: "both empty nodes should return not updated",
old: &kubernetes.Node{},
new: &kubernetes.Node{},
updated: false,
},
{
test: "resource version is the same should return not updated",
old: &kubernetes.Node{
ObjectMeta: kubernetes.ObjectMeta{
ResourceVersion: "1",
},
},
new: &kubernetes.Node{
ObjectMeta: kubernetes.ObjectMeta{
ResourceVersion: "1",
},
},
},
{
test: "if meta changes then it should return updated",
old: &kubernetes.Node{
ObjectMeta: kubernetes.ObjectMeta{
ResourceVersion: "1",
Annotations: map[string]string{},
},
},
new: &kubernetes.Node{
ObjectMeta: kubernetes.ObjectMeta{
ResourceVersion: "2",
Annotations: map[string]string{
"a": "b",
},
},
},
updated: true,
},
{
test: "if spec changes then it should return updated",
old: &kubernetes.Node{
ObjectMeta: kubernetes.ObjectMeta{
ResourceVersion: "1",
Annotations: map[string]string{
"a": "b",
},
},
Spec: v1.NodeSpec{
ProviderID: "1",
Unschedulable: false,
},
},
new: &kubernetes.Node{
ObjectMeta: kubernetes.ObjectMeta{
ResourceVersion: "2",
Annotations: map[string]string{
"a": "b",
},
},
Spec: v1.NodeSpec{
ProviderID: "1",
Unschedulable: true,
},
},
updated: true,
},
{
test: "if overall status doesn't change then its not an update",
old: &kubernetes.Node{
ObjectMeta: kubernetes.ObjectMeta{
ResourceVersion: "1",
Annotations: map[string]string{
"a": "b",
},
},
Spec: v1.NodeSpec{
ProviderID: "1",
Unschedulable: true,
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
},
},
},
},
new: &kubernetes.Node{
ObjectMeta: kubernetes.ObjectMeta{
ResourceVersion: "2",
Annotations: map[string]string{
"a": "b",
},
},
Spec: v1.NodeSpec{
ProviderID: "1",
Unschedulable: true,
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
},
},
},
},
updated: false,
},
{
test: "if node status changes then its an update",
old: &kubernetes.Node{
ObjectMeta: kubernetes.ObjectMeta{
ResourceVersion: "1",
Annotations: map[string]string{
"a": "b",
},
},
Spec: v1.NodeSpec{
ProviderID: "1",
Unschedulable: true,
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionFalse,
},
},
},
},
new: &kubernetes.Node{
ObjectMeta: kubernetes.ObjectMeta{
ResourceVersion: "2",
Annotations: map[string]string{
"a": "b",
},
},
Spec: v1.NodeSpec{
ProviderID: "1",
Unschedulable: true,
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
},
},
},
},
updated: true,
},
}

for _, test := range tests {
t.Run(test.test, func(t *testing.T) {
assert.Equal(t, test.updated, isUpdated(test.old, test.new))
})
}
}
22 changes: 17 additions & 5 deletions libbeat/common/kubernetes/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ type WatchOptions struct {
Node string
// Namespace is used for filtering watched resource to given namespace, use "" for all namespaces
Namespace string
// IsUpdated allows registering a func that allows the invoker of the Watch to decide what amounts to an update
// vs what does not.
IsUpdated func(old, new interface{}) bool
}

type item struct {
Expand Down Expand Up @@ -100,6 +103,19 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption
queue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), objType)
ctx, cancel := context.WithCancel(context.Background())

if opts.IsUpdated == nil {
opts.IsUpdated = func(o, n interface{}) bool {
old, _ := accessor.ResourceVersion(o.(runtime.Object))
new, _ := accessor.ResourceVersion(n.(runtime.Object))

// Only enqueue changes that have a different resource versions to avoid processing resyncs.
if old != new {
return true
}
return false
}
}

w := &watcher{
client: client,
informer: informer,
Expand All @@ -119,11 +135,7 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption
w.enqueue(o, delete)
},
UpdateFunc: func(o, n interface{}) {
old, _ := accessor.ResourceVersion(o.(runtime.Object))
new, _ := accessor.ResourceVersion(n.(runtime.Object))

// Only enqueue changes that have a different resource versions to avoid processing resyncs.
if old != new {
if opts.IsUpdated(o, n) {
w.enqueue(n, update)
}
},
Expand Down