Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add k8s metadata autodetection #13473

Merged
merged 10 commits into from
Sep 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add support for RFC3339 time zone offsets in JSON output. {pull}13227[13227]
- Add autodetection mode for add_docker_metadata and enable it by default in included configuration files{pull}13374[13374]
- Added `monitoring.cluster_uuid` setting to associate Beat data with specified ES cluster in Stack Monitoring UI. {pull}13182[13182]
- Add autodetection mode for add_kubernetes_metadata and enable it by default in included configuration files. {pull}13473[13473]


*Auditbeat*

Expand Down
1 change: 1 addition & 0 deletions auditbeat/auditbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~

#================================ Logging =====================================

Expand Down
1 change: 1 addition & 0 deletions filebeat/filebeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~

#================================ Logging =====================================

Expand Down
1 change: 1 addition & 0 deletions journalbeat/journalbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~

#================================ Logging =====================================

Expand Down
1 change: 1 addition & 0 deletions libbeat/_meta/config.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~
{{else}}
processors:
- add_observer_metadata:
Expand Down
50 changes: 39 additions & 11 deletions libbeat/processors/add_kubernetes_metadata/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"time"

k8sclient "k8s.io/client-go/kubernetes"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/kubernetes"
Expand All @@ -33,10 +35,11 @@ const (
)

type kubernetesAnnotator struct {
watcher kubernetes.Watcher
indexers *Indexers
matchers *Matchers
cache *cache
watcher kubernetes.Watcher
indexers *Indexers
matchers *Matchers
cache *cache
kubernetesAvailable bool
}

func init() {
Expand All @@ -51,6 +54,16 @@ func init() {
Indexing.AddMatcher(FieldFormatMatcherName, NewFieldFormatMatcher)
}

func isKubernetesAvailable(client k8sclient.Interface) bool {
server, err := client.Discovery().ServerVersion()
if err != nil {
logp.Info("%v: could not detect kubernetes env: %v", "add_kubernetes_metadata", err)
return false
}
logp.Info("%v: kubernetes env detected, with version: %v", "add_kubernetes_metadata", server)
return true
}

// New constructs a new add_kubernetes_metadata processor.
func New(cfg *common.Config) (processors.Processor, error) {
config := defaultKubernetesAnnotatorConfig()
Expand Down Expand Up @@ -91,9 +104,25 @@ func New(cfg *common.Config) (processors.Processor, error) {
return nil, fmt.Errorf("Can not initialize kubernetes plugin with zero matcher plugins")
}

processor := &kubernetesAnnotator{
indexers: indexers,
matchers: matchers,
cache: newCache(config.CleanupTimeout),
kubernetesAvailable: false,
}

client, err := kubernetes.GetKubernetesClient(config.KubeConfig)
if err != nil {
return nil, err
if kubernetes.IsInCluster(config.KubeConfig) {
logp.Debug("kubernetes", "%v: could not create kubernetes client using in_cluster config", "add_kubernetes_metadata")
} else {
logp.Debug("kubernetes", "%v: could not create kubernetes client using config: %v", "add_kubernetes_metadata", config.KubeConfig)
}
return processor, nil
}

if !isKubernetesAvailable(client) {
return processor, nil
}

config.Host = kubernetes.DiscoverKubernetesNode(config.Host, kubernetes.IsInCluster(config.KubeConfig), client)
Expand All @@ -111,12 +140,8 @@ func New(cfg *common.Config) (processors.Processor, error) {
return nil, err
}

processor := &kubernetesAnnotator{
watcher: watcher,
indexers: indexers,
matchers: matchers,
cache: newCache(config.CleanupTimeout),
}
processor.watcher = watcher
processor.kubernetesAvailable = true

watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
Expand All @@ -139,6 +164,9 @@ func New(cfg *common.Config) (processors.Processor, error) {
}

func (k *kubernetesAnnotator) Run(event *beat.Event) (*beat.Event, error) {
if !k.kubernetesAvailable {
return event, nil
}
index := k.matchers.MetadataIndex(event.Fields)
if index == "" {
return event, nil
Expand Down
40 changes: 40 additions & 0 deletions libbeat/processors/add_kubernetes_metadata/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func TestAnnotatorDeepUpdate(t *testing.T) {
matchers: &Matchers{
matchers: []Matcher{matcher},
},
kubernetesAvailable: true,
}

processor.cache.set("foo", common.MapStr{
Expand Down Expand Up @@ -86,3 +87,42 @@ func TestAnnotatorDeepUpdate(t *testing.T) {
},
}, event.Fields)
}

// Test metadata are not included in the event
func TestAnnotatorWithNoKubernetesAvailable(t *testing.T) {
cfg := common.MustNewConfigFrom(map[string]interface{}{
"lookup_fields": []string{"kubernetes.pod.name"},
})
matcher, err := NewFieldMatcher(*cfg)
if err != nil {
t.Fatal(err)
}

processor := kubernetesAnnotator{
cache: newCache(10 * time.Second),
matchers: &Matchers{
matchers: []Matcher{matcher},
},
kubernetesAvailable: false,
}

intialEventMap := common.MapStr{
"kubernetes": common.MapStr{
"pod": common.MapStr{
"name": "foo",
"id": "pod_id",
"metrics": common.MapStr{
"a": 1,
"b": 2,
},
},
},
}

event, err := processor.Run(&beat.Event{
Fields: intialEventMap.Clone(),
})
assert.NoError(t, err)

assert.Equal(t, intialEventMap, event.Fields)
}
1 change: 1 addition & 0 deletions metricbeat/metricbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~

#================================ Logging =====================================

Expand Down
1 change: 1 addition & 0 deletions packetbeat/packetbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~

#================================ Logging =====================================

Expand Down
1 change: 1 addition & 0 deletions winlogbeat/winlogbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~

#================================ Logging =====================================

Expand Down
1 change: 1 addition & 0 deletions x-pack/auditbeat/auditbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~

#================================ Logging =====================================

Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/filebeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~

#================================ Logging =====================================

Expand Down
1 change: 1 addition & 0 deletions x-pack/functionbeat/functionbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~

#================================ Logging =====================================

Expand Down
1 change: 1 addition & 0 deletions x-pack/metricbeat/metricbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~

#================================ Logging =====================================

Expand Down
1 change: 1 addition & 0 deletions x-pack/winlogbeat/winlogbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~

#================================ Logging =====================================

Expand Down