Skip to content

Commit

Permalink
Remove unnecessary restarts of metricsets while using Node autodiscover
Browse files Browse the repository at this point in the history
  • Loading branch information
vjsamuel committed Jul 20, 2020
1 parent f74745b commit ca872dd
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix metrics hints builder to avoid wrong container metadata usage when port is not exposed {pull}18979[18979]
- Server-side TLS config now validates certificate and key are both specified {pull}19584[19584]
- Fix seccomp policy for calls to `chmod` and `chown`. {pull}20054[20054]
- Remove unnecessary restarts of metricsets while using Node autodiscover {pull}19974[19974]

*Auditbeat*

Expand Down
36 changes: 36 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/gofrs/uuid"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
k8s "k8s.io/client-go/kubernetes"

"github.com/elastic/beats/v7/libbeat/autodiscover/builder"
Expand Down Expand Up @@ -66,6 +67,7 @@ func NewNodeEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pu
watcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Node: config.Node,
IsUpdated: isUpdated,
}, nil)

if err != nil {
Expand Down Expand Up @@ -190,6 +192,40 @@ func (n *node) emit(node *kubernetes.Node, flag string) {
n.publish(event)
}

func isUpdated(o, n interface{}) bool {
old, _ := o.(*kubernetes.Node)
new, _ := n.(*kubernetes.Node)

// Consider as not update in case one of the two objects is not a Node
if old == nil || new == nil {
return true
}

// This is a resync. It is not an update
if old.ResourceVersion == new.ResourceVersion {
return false
}

// If the old object and new object are different
oldCopy := old.DeepCopy()
oldCopy.ResourceVersion = ""

newCopy := new.DeepCopy()
newCopy.ResourceVersion = ""

// If the old object and new object are different in either meta or spec then there is a valid change
fmt.Println(equality.Semantic.DeepDerivative(oldCopy.Spec, newCopy.Spec), equality.Semantic.DeepDerivative(oldCopy.ObjectMeta, newCopy.ObjectMeta))
if !equality.Semantic.DeepEqual(oldCopy.Spec, newCopy.Spec) || !equality.Semantic.DeepEqual(oldCopy.ObjectMeta, newCopy.ObjectMeta) {
return true
}

// If there is a change in the node status then there is a valid change.
if isNodeReady(old) != isNodeReady(new) {
return true
}
return false
}

func getAddress(node *kubernetes.Node) string {
for _, address := range node.Status.Addresses {
if address.Type == v1.NodeExternalIP && address.Address != "" {
Expand Down
175 changes: 175 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,3 +278,178 @@ func TestEmitEvent_Node(t *testing.T) {
})
}
}

func TestNode_isUpdated(t *testing.T) {
tests := []struct {
old *kubernetes.Node
new *kubernetes.Node
updated bool
test string
}{
{
test: "one of the objects is nil then its updated",
old: nil,
new: &kubernetes.Node{},
updated: true,
},
{
test: "both empty nodes should return not updated",
old: &kubernetes.Node{},
new: &kubernetes.Node{},
updated: false,
},
{
test: "resource version is the same should return not updated",
old: &kubernetes.Node{
ObjectMeta: kubernetes.ObjectMeta{
ResourceVersion: "1",
},
},
new: &kubernetes.Node{
ObjectMeta: kubernetes.ObjectMeta{
ResourceVersion: "1",
},
},
},
{
test: "if meta changes then it should return updated",
old: &kubernetes.Node{
ObjectMeta: kubernetes.ObjectMeta{
ResourceVersion: "1",
Annotations: map[string]string{},
},
},
new: &kubernetes.Node{
ObjectMeta: kubernetes.ObjectMeta{
ResourceVersion: "2",
Annotations: map[string]string{
"a": "b",
},
},
},
updated: true,
},
{
test: "if spec changes then it should return updated",
old: &kubernetes.Node{
ObjectMeta: kubernetes.ObjectMeta{
ResourceVersion: "1",
Annotations: map[string]string{
"a": "b",
},
},
Spec: v1.NodeSpec{
ProviderID: "1",
Unschedulable: false,
},
},
new: &kubernetes.Node{
ObjectMeta: kubernetes.ObjectMeta{
ResourceVersion: "2",
Annotations: map[string]string{
"a": "b",
},
},
Spec: v1.NodeSpec{
ProviderID: "1",
Unschedulable: true,
},
},
updated: true,
},
{
test: "if overall status doesn't change then its not an update",
old: &kubernetes.Node{
ObjectMeta: kubernetes.ObjectMeta{
ResourceVersion: "1",
Annotations: map[string]string{
"a": "b",
},
},
Spec: v1.NodeSpec{
ProviderID: "1",
Unschedulable: true,
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
},
},
},
},
new: &kubernetes.Node{
ObjectMeta: kubernetes.ObjectMeta{
ResourceVersion: "2",
Annotations: map[string]string{
"a": "b",
},
},
Spec: v1.NodeSpec{
ProviderID: "1",
Unschedulable: true,
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
},
},
},
},
updated: false,
},
{
test: "if node status changes then its an update",
old: &kubernetes.Node{
ObjectMeta: kubernetes.ObjectMeta{
ResourceVersion: "1",
Annotations: map[string]string{
"a": "b",
},
},
Spec: v1.NodeSpec{
ProviderID: "1",
Unschedulable: true,
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionFalse,
},
},
},
},
new: &kubernetes.Node{
ObjectMeta: kubernetes.ObjectMeta{
ResourceVersion: "2",
Annotations: map[string]string{
"a": "b",
},
},
Spec: v1.NodeSpec{
ProviderID: "1",
Unschedulable: true,
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
},
},
},
},
updated: true,
},
}

for _, test := range tests {
t.Run(test.test, func(t *testing.T) {
assert.Equal(t, test.updated, isUpdated(test.old, test.new))
})
}
}
5 changes: 4 additions & 1 deletion libbeat/common/kubernetes/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ type WatchOptions struct {
Node string
// Namespace is used for filtering watched resource to given namespace, use "" for all namespaces
Namespace string

IsUpdated func(old, new interface{}) bool
}

type item struct {
Expand Down Expand Up @@ -123,7 +125,8 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption
new, _ := accessor.ResourceVersion(n.(runtime.Object))

// Only enqueue changes that have a different resource versions to avoid processing resyncs.
if old != new {
// Also if there is a registered function to check for updates, use it to check for updates.
if old != new && opts.IsUpdated != nil && opts.IsUpdated(o, n) {
w.enqueue(n, update)
}
},
Expand Down

0 comments on commit ca872dd

Please sign in to comment.