From 4fd9d44068c0216c5a4b3250e1ae0aee2efe2c55 Mon Sep 17 00:00:00 2001 From: chrismark Date: Tue, 3 Sep 2019 16:04:45 +0300 Subject: [PATCH 01/10] Add k8s conn check Signed-off-by: chrismark --- auditbeat/auditbeat.yml | 1 + filebeat/filebeat.yml | 1 + journalbeat/journalbeat.yml | 1 + libbeat/_meta/config.yml.tmpl | 1 + .../add_kubernetes_metadata/config.go | 2 + .../add_kubernetes_metadata/kubernetes.go | 43 +++++++++++++++---- metricbeat/metricbeat.yml | 1 + packetbeat/packetbeat.yml | 1 + winlogbeat/winlogbeat.yml | 1 + x-pack/auditbeat/auditbeat.yml | 1 + x-pack/filebeat/filebeat.yml | 1 + x-pack/functionbeat/functionbeat.yml | 1 + x-pack/metricbeat/metricbeat.yml | 1 + x-pack/winlogbeat/winlogbeat.yml | 1 + 14 files changed, 49 insertions(+), 8 deletions(-) diff --git a/auditbeat/auditbeat.yml b/auditbeat/auditbeat.yml index d3db66e60d57..a914f510073c 100644 --- a/auditbeat/auditbeat.yml +++ b/auditbeat/auditbeat.yml @@ -149,6 +149,7 @@ processors: - add_host_metadata: ~ - add_cloud_metadata: ~ - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ #================================ Logging ===================================== diff --git a/filebeat/filebeat.yml b/filebeat/filebeat.yml index a404af47c161..d02b4d161fbf 100644 --- a/filebeat/filebeat.yml +++ b/filebeat/filebeat.yml @@ -177,6 +177,7 @@ processors: - add_host_metadata: ~ - add_cloud_metadata: ~ - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ #================================ Logging ===================================== diff --git a/journalbeat/journalbeat.yml b/journalbeat/journalbeat.yml index ee2ce4502f05..5e8aad8b4ad4 100644 --- a/journalbeat/journalbeat.yml +++ b/journalbeat/journalbeat.yml @@ -146,6 +146,7 @@ processors: - add_host_metadata: ~ - add_cloud_metadata: ~ - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ #================================ Logging ===================================== diff --git a/libbeat/_meta/config.yml.tmpl b/libbeat/_meta/config.yml.tmpl index e687c0cc7679..9ef6a1c8adc7 100644 --- a/libbeat/_meta/config.yml.tmpl +++ b/libbeat/_meta/config.yml.tmpl @@ -94,6 +94,7 @@ processors: - add_host_metadata: ~ - add_cloud_metadata: ~ - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ {{else}} processors: - add_observer_metadata: diff --git a/libbeat/processors/add_kubernetes_metadata/config.go b/libbeat/processors/add_kubernetes_metadata/config.go index 84d2c246948d..9cd2be839b6b 100644 --- a/libbeat/processors/add_kubernetes_metadata/config.go +++ b/libbeat/processors/add_kubernetes_metadata/config.go @@ -18,6 +18,7 @@ package add_kubernetes_metadata import ( + "os" "time" "github.com/elastic/beats/libbeat/common" @@ -45,6 +46,7 @@ type PluginConfig []map[string]common.Config func defaultKubernetesAnnotatorConfig() kubeAnnotatorConfig { return kubeAnnotatorConfig{ + KubeConfig: os.Getenv("HOME") + "/.kube/config", SyncPeriod: 10 * time.Minute, CleanupTimeout: 60 * time.Second, DefaultMatchers: Enabled{true}, diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index ea635314b079..054373e327c2 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -26,6 +26,9 @@ import ( "github.com/elastic/beats/libbeat/common/kubernetes" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/processors" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8s "k8s.io/client-go/kubernetes" ) const ( @@ -33,10 +36,11 @@ const ( ) type kubernetesAnnotator struct { - watcher kubernetes.Watcher - indexers *Indexers - matchers *Matchers - cache *cache + watcher kubernetes.Watcher + indexers *Indexers + matchers *Matchers + cache *cache + kubernetesAvailable bool } func init() { @@ -51,6 +55,16 @@ func init() { Indexing.AddMatcher(FieldFormatMatcherName, NewFieldFormatMatcher) } +func isKubernetesAvailable(client k8s.Interface) bool { + info, err := client.CoreV1().Nodes().List(metav1.ListOptions{}) + if err != nil { + logp.Debug("kubernetes", "%v: could not detect kubernetes env", "add_kubernetes_metadata") + return false + } + logp.Debug("kubernetes", "%v: kubernetes env detected: %v", "add_kubernetes_metadata", info) + return true +} + // New constructs a new add_kubernetes_metadata processor. func New(cfg *common.Config) (processors.Processor, error) { config := defaultKubernetesAnnotatorConfig() @@ -96,6 +110,15 @@ func New(cfg *common.Config) (processors.Processor, error) { return nil, err } + if !isKubernetesAvailable(client) { + return &kubernetesAnnotator{ + indexers: indexers, + matchers: matchers, + cache: newCache(config.CleanupTimeout), + kubernetesAvailable: false, + }, nil + } + config.Host = kubernetes.DiscoverKubernetesNode(config.Host, kubernetes.IsInCluster(config.KubeConfig), client) logp.Debug("kubernetes", "Using host: %s", config.Host) @@ -112,10 +135,11 @@ func New(cfg *common.Config) (processors.Processor, error) { } processor := &kubernetesAnnotator{ - watcher: watcher, - indexers: indexers, - matchers: matchers, - cache: newCache(config.CleanupTimeout), + watcher: watcher, + indexers: indexers, + matchers: matchers, + cache: newCache(config.CleanupTimeout), + kubernetesAvailable: true, } watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{ @@ -139,6 +163,9 @@ func New(cfg *common.Config) (processors.Processor, error) { } func (k *kubernetesAnnotator) Run(event *beat.Event) (*beat.Event, error) { + if !k.kubernetesAvailable { + return event, nil + } index := k.matchers.MetadataIndex(event.Fields) if index == "" { return event, nil diff --git a/metricbeat/metricbeat.yml b/metricbeat/metricbeat.yml index 8fb745093b8b..6d2d0519b442 100644 --- a/metricbeat/metricbeat.yml +++ b/metricbeat/metricbeat.yml @@ -121,6 +121,7 @@ processors: - add_host_metadata: ~ - add_cloud_metadata: ~ - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ #================================ Logging ===================================== diff --git a/packetbeat/packetbeat.yml b/packetbeat/packetbeat.yml index 0a306eaeb6bf..965e5c5975a6 100644 --- a/packetbeat/packetbeat.yml +++ b/packetbeat/packetbeat.yml @@ -203,6 +203,7 @@ processors: - add_host_metadata: ~ - add_cloud_metadata: ~ - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ #================================ Logging ===================================== diff --git a/winlogbeat/winlogbeat.yml b/winlogbeat/winlogbeat.yml index 253c5d4131f1..7d900eea1a14 100644 --- a/winlogbeat/winlogbeat.yml +++ b/winlogbeat/winlogbeat.yml @@ -128,6 +128,7 @@ processors: - add_host_metadata: ~ - add_cloud_metadata: ~ - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ #================================ Logging ===================================== diff --git a/x-pack/auditbeat/auditbeat.yml b/x-pack/auditbeat/auditbeat.yml index 814609baefe9..a4d0f54f1cb7 100644 --- a/x-pack/auditbeat/auditbeat.yml +++ b/x-pack/auditbeat/auditbeat.yml @@ -171,6 +171,7 @@ processors: - add_host_metadata: ~ - add_cloud_metadata: ~ - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ #================================ Logging ===================================== diff --git a/x-pack/filebeat/filebeat.yml b/x-pack/filebeat/filebeat.yml index a404af47c161..d02b4d161fbf 100644 --- a/x-pack/filebeat/filebeat.yml +++ b/x-pack/filebeat/filebeat.yml @@ -177,6 +177,7 @@ processors: - add_host_metadata: ~ - add_cloud_metadata: ~ - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ #================================ Logging ===================================== diff --git a/x-pack/functionbeat/functionbeat.yml b/x-pack/functionbeat/functionbeat.yml index 06bf3681dd87..8182bddeced6 100644 --- a/x-pack/functionbeat/functionbeat.yml +++ b/x-pack/functionbeat/functionbeat.yml @@ -345,6 +345,7 @@ processors: - add_host_metadata: ~ - add_cloud_metadata: ~ - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ #================================ Logging ===================================== diff --git a/x-pack/metricbeat/metricbeat.yml b/x-pack/metricbeat/metricbeat.yml index 8fb745093b8b..6d2d0519b442 100644 --- a/x-pack/metricbeat/metricbeat.yml +++ b/x-pack/metricbeat/metricbeat.yml @@ -121,6 +121,7 @@ processors: - add_host_metadata: ~ - add_cloud_metadata: ~ - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ #================================ Logging ===================================== diff --git a/x-pack/winlogbeat/winlogbeat.yml b/x-pack/winlogbeat/winlogbeat.yml index 35ddff7e9fa0..690ed27a9eac 100644 --- a/x-pack/winlogbeat/winlogbeat.yml +++ b/x-pack/winlogbeat/winlogbeat.yml @@ -140,6 +140,7 @@ processors: - add_host_metadata: ~ - add_cloud_metadata: ~ - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ #================================ Logging ===================================== From 894f5b0126c5b453eb50650c5118a28e82f32690 Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 4 Sep 2019 17:56:55 +0300 Subject: [PATCH 02/10] Add tests and changelog Signed-off-by: chrismark --- CHANGELOG.next.asciidoc | 2 + .../kubernetes_test.go | 49 +++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 397070b9f254..95b4922bd83e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -248,6 +248,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add support for RFC3339 time zone offsets in JSON output. {pull}13227[13227] - Add autodetection mode for add_docker_metadata and enable it by default in included configuration files{pull}13374[13374] - Added `monitoring.cluster_uuid` setting to associate Beat data with specified ES cluster in Stack Monitoring UI. {pull}13182[13182] +- Add autodetection mode for add_kubernetes_metadata and enable it by default in included configuration files{pull}13473[13473] + *Auditbeat* diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes_test.go b/libbeat/processors/add_kubernetes_metadata/kubernetes_test.go index c3467b5c96f2..d437bf815f11 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes_test.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes_test.go @@ -42,6 +42,7 @@ func TestAnnotatorDeepUpdate(t *testing.T) { matchers: &Matchers{ matchers: []Matcher{matcher}, }, + kubernetesAvailable: true, } processor.cache.set("foo", common.MapStr{ @@ -86,3 +87,51 @@ func TestAnnotatorDeepUpdate(t *testing.T) { }, }, event.Fields) } + +// Test metadata are not included in the event +func TestAnnotatorWithNoKubernetesAvailable(t *testing.T) { + cfg := common.MustNewConfigFrom(map[string]interface{}{ + "lookup_fields": []string{"kubernetes.pod.name"}, + }) + matcher, err := NewFieldMatcher(*cfg) + if err != nil { + t.Fatal(err) + } + + processor := kubernetesAnnotator{ + cache: newCache(10 * time.Second), + matchers: &Matchers{ + matchers: []Matcher{matcher}, + }, + kubernetesAvailable: false, + } + + processor.cache.set("foo", common.MapStr{ + "pod": common.MapStr{ + "labels": common.MapStr{ + "dont": "replace", + "original": "fields", + }, + }, + }) + + intialEventMap := common.MapStr{ + "kubernetes": common.MapStr{ + "pod": common.MapStr{ + "name": "foo", + "id": "pod_id", + "metrics": common.MapStr{ + "a": 1, + "b": 2, + }, + }, + }, + } + + event, err := processor.Run(&beat.Event{ + Fields: intialEventMap.Clone(), + }) + assert.NoError(t, err) + + assert.Equal(t, intialEventMap, event.Fields) +} From 2a9577fe5f61776279327662ef57739f0cd1ddef Mon Sep 17 00:00:00 2001 From: chrismark Date: Tue, 10 Sep 2019 10:14:40 +0300 Subject: [PATCH 03/10] Improve k8s environment detection Signed-off-by: chrismark --- .../add_kubernetes_metadata/config.go | 47 ++++++++++++++++++- .../add_kubernetes_metadata/kubernetes.go | 33 +++++++------ 2 files changed, 62 insertions(+), 18 deletions(-) diff --git a/libbeat/processors/add_kubernetes_metadata/config.go b/libbeat/processors/add_kubernetes_metadata/config.go index 9cd2be839b6b..519099b14b86 100644 --- a/libbeat/processors/add_kubernetes_metadata/config.go +++ b/libbeat/processors/add_kubernetes_metadata/config.go @@ -19,6 +19,8 @@ package add_kubernetes_metadata import ( "os" + "os/exec" + "strings" "time" "github.com/elastic/beats/libbeat/common" @@ -44,9 +46,52 @@ type Enabled struct { type PluginConfig []map[string]common.Config +func trimAfter(value string, a string) string { + // Get substring after a string. + pos := strings.LastIndex(value, a) + if pos == -1 { + return "" + } + adjustedPos := pos + len(a) + if adjustedPos >= len(value) { + return "" + } + return value[adjustedPos:len(value)] +} + +func findDaemonsConfigFlag() string { + // Search in processes and search for config in flags + psRes, _ := exec.Command("bash", "-c", "ps aux | grep -- -kubeconfig=").Output() + psResStr := string(psRes) + trimmedPsResStr := trimAfter(psResStr, "--kubeconfig=") + arr := strings.Split(trimmedPsResStr, " ") + kubeConfigPath := arr[0] + return kubeConfigPath +} + +func getSystemKubeConfig() string { + homeKubeConfig := os.Getenv("HOME") + "/.kube/config" + if _, err := os.Stat(homeKubeConfig); !os.IsNotExist(err) { + return homeKubeConfig + } + envKubeConfig := os.Getenv("KUBECONFIG") + if _, err := os.Stat(envKubeConfig); !os.IsNotExist(err) { + return envKubeConfig + } + kubeConfigPath := findDaemonsConfigFlag() + if _, err := os.Stat(kubeConfigPath); !os.IsNotExist(err) { + return kubeConfigPath + } + kubeletConfig := "/etc/kubernetes/kubelet.conf" + if _, err := os.Stat(kubeletConfig); !os.IsNotExist(err) { + return kubeletConfig + } + return homeKubeConfig +} + func defaultKubernetesAnnotatorConfig() kubeAnnotatorConfig { return kubeAnnotatorConfig{ - KubeConfig: os.Getenv("HOME") + "/.kube/config", + KubeConfig: getSystemKubeConfig(), SyncPeriod: 10 * time.Minute, CleanupTimeout: 60 * time.Second, DefaultMatchers: Enabled{true}, diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index 054373e327c2..0edde1eb1c18 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -27,7 +27,6 @@ import ( "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/processors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8s "k8s.io/client-go/kubernetes" ) @@ -56,12 +55,12 @@ func init() { } func isKubernetesAvailable(client k8s.Interface) bool { - info, err := client.CoreV1().Nodes().List(metav1.ListOptions{}) + server, err := client.Discovery().ServerVersion() if err != nil { logp.Debug("kubernetes", "%v: could not detect kubernetes env", "add_kubernetes_metadata") return false } - logp.Debug("kubernetes", "%v: kubernetes env detected: %v", "add_kubernetes_metadata", info) + logp.Debug("kubernetes", "%v: kubernetes env detected, with version: %v", "add_kubernetes_metadata", server) return true } @@ -105,20 +104,26 @@ func New(cfg *common.Config) (processors.Processor, error) { return nil, fmt.Errorf("Can not initialize kubernetes plugin with zero matcher plugins") } + processor := &kubernetesAnnotator{ + indexers: indexers, + matchers: matchers, + cache: newCache(config.CleanupTimeout), + } + client, err := kubernetes.GetKubernetesClient(config.KubeConfig) if err != nil { - return nil, err + logp.Debug("kubernetes", "%v: could not create kubernetes client with config: %v", "add_kubernetes_metadata", config.KubeConfig) + processor.kubernetesAvailable = false + return processor, nil } if !isKubernetesAvailable(client) { - return &kubernetesAnnotator{ - indexers: indexers, - matchers: matchers, - cache: newCache(config.CleanupTimeout), - kubernetesAvailable: false, - }, nil + processor.kubernetesAvailable = false + return processor, nil } + processor.kubernetesAvailable = true + config.Host = kubernetes.DiscoverKubernetesNode(config.Host, kubernetes.IsInCluster(config.KubeConfig), client) logp.Debug("kubernetes", "Using host: %s", config.Host) @@ -134,13 +139,7 @@ func New(cfg *common.Config) (processors.Processor, error) { return nil, err } - processor := &kubernetesAnnotator{ - watcher: watcher, - indexers: indexers, - matchers: matchers, - cache: newCache(config.CleanupTimeout), - kubernetesAvailable: true, - } + processor.watcher = watcher watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { From cbd16eb2858d74d6db5c733f458e9111bff979f9 Mon Sep 17 00:00:00 2001 From: chrismark Date: Tue, 10 Sep 2019 13:36:42 +0300 Subject: [PATCH 04/10] Improve detection and add InClusterConfig check Signed-off-by: chrismark --- libbeat/common/kubernetes/util.go | 22 +++++++++-- .../add_kubernetes_metadata/config.go | 39 ++----------------- 2 files changed, 21 insertions(+), 40 deletions(-) diff --git a/libbeat/common/kubernetes/util.go b/libbeat/common/kubernetes/util.go index 30385e4c76c5..6a5ef2e68e94 100644 --- a/libbeat/common/kubernetes/util.go +++ b/libbeat/common/kubernetes/util.go @@ -23,6 +23,8 @@ import ( "os" "strings" + "k8s.io/client-go/rest" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" @@ -32,16 +34,28 @@ import ( const defaultNode = "localhost" -// GetKubernetesClient returns a kubernetes client. If inCluster is true, it returns an -// in cluster configuration based on the secrets mounted in the Pod. If kubeConfig is passed, -// it parses the config file to get the config required to build a client. +// GetKubernetesClient returns a kubernetes client. If InClusterConfig is detected, it returns a +// a client from cluster configuration based on the secrets mounted in the Pod. Otherwise, +// it parses kubeConfig file to get the config required to build a client. func GetKubernetesClient(kubeconfig string) (kubernetes.Interface, error) { + var client *kubernetes.Clientset + var err error + + clusterConfig, err := rest.InClusterConfig() + if err == nil { + client, err = kubernetes.NewForConfig(clusterConfig) + if err != nil { + return nil, fmt.Errorf("unable to build kubernetes clientset: %+v", err) + } + + return client, nil + } cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig) if err != nil { return nil, fmt.Errorf("unable to build kube config due to error: %+v", err) } - client, err := kubernetes.NewForConfig(cfg) + client, err = kubernetes.NewForConfig(cfg) if err != nil { return nil, fmt.Errorf("unable to build kubernetes clientset: %+v", err) } diff --git a/libbeat/processors/add_kubernetes_metadata/config.go b/libbeat/processors/add_kubernetes_metadata/config.go index 519099b14b86..50fc6a4361d2 100644 --- a/libbeat/processors/add_kubernetes_metadata/config.go +++ b/libbeat/processors/add_kubernetes_metadata/config.go @@ -19,8 +19,6 @@ package add_kubernetes_metadata import ( "os" - "os/exec" - "strings" "time" "github.com/elastic/beats/libbeat/common" @@ -46,45 +44,14 @@ type Enabled struct { type PluginConfig []map[string]common.Config -func trimAfter(value string, a string) string { - // Get substring after a string. - pos := strings.LastIndex(value, a) - if pos == -1 { - return "" - } - adjustedPos := pos + len(a) - if adjustedPos >= len(value) { - return "" - } - return value[adjustedPos:len(value)] -} - -func findDaemonsConfigFlag() string { - // Search in processes and search for config in flags - psRes, _ := exec.Command("bash", "-c", "ps aux | grep -- -kubeconfig=").Output() - psResStr := string(psRes) - trimmedPsResStr := trimAfter(psResStr, "--kubeconfig=") - arr := strings.Split(trimmedPsResStr, " ") - kubeConfigPath := arr[0] - return kubeConfigPath -} - func getSystemKubeConfig() string { - homeKubeConfig := os.Getenv("HOME") + "/.kube/config" - if _, err := os.Stat(homeKubeConfig); !os.IsNotExist(err) { - return homeKubeConfig - } envKubeConfig := os.Getenv("KUBECONFIG") if _, err := os.Stat(envKubeConfig); !os.IsNotExist(err) { return envKubeConfig } - kubeConfigPath := findDaemonsConfigFlag() - if _, err := os.Stat(kubeConfigPath); !os.IsNotExist(err) { - return kubeConfigPath - } - kubeletConfig := "/etc/kubernetes/kubelet.conf" - if _, err := os.Stat(kubeletConfig); !os.IsNotExist(err) { - return kubeletConfig + homeKubeConfig := os.Getenv("HOME") + "/.kube/config" + if _, err := os.Stat(homeKubeConfig); !os.IsNotExist(err) { + return homeKubeConfig } return homeKubeConfig } From edf97934f074a560d0eb8cdfdc3f64ae32abddb6 Mon Sep 17 00:00:00 2001 From: chrismark Date: Tue, 10 Sep 2019 15:13:39 +0300 Subject: [PATCH 05/10] Move inCluster Check in config initialization Signed-off-by: chrismark --- libbeat/common/kubernetes/util.go | 21 ++++--------------- .../add_kubernetes_metadata/config.go | 12 +++++++++++ .../add_kubernetes_metadata/kubernetes.go | 5 +++++ 3 files changed, 21 insertions(+), 17 deletions(-) diff --git a/libbeat/common/kubernetes/util.go b/libbeat/common/kubernetes/util.go index 6a5ef2e68e94..871f304b7e42 100644 --- a/libbeat/common/kubernetes/util.go +++ b/libbeat/common/kubernetes/util.go @@ -23,8 +23,6 @@ import ( "os" "strings" - "k8s.io/client-go/rest" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" @@ -34,28 +32,17 @@ import ( const defaultNode = "localhost" -// GetKubernetesClient returns a kubernetes client. If InClusterConfig is detected, it returns a -// a client from cluster configuration based on the secrets mounted in the Pod. Otherwise, -// it parses kubeConfig file to get the config required to build a client. +// GetKubernetesClient returns a kubernetes client. If inCluster is true, it returns an +// in cluster configuration based on the secrets mounted in the Pod. If kubeConfig is passed, +// it parses the config file to get the config required to build a client. func GetKubernetesClient(kubeconfig string) (kubernetes.Interface, error) { - var client *kubernetes.Clientset - var err error - clusterConfig, err := rest.InClusterConfig() - if err == nil { - client, err = kubernetes.NewForConfig(clusterConfig) - if err != nil { - return nil, fmt.Errorf("unable to build kubernetes clientset: %+v", err) - } - - return client, nil - } cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig) if err != nil { return nil, fmt.Errorf("unable to build kube config due to error: %+v", err) } - client, err = kubernetes.NewForConfig(cfg) + client, err := kubernetes.NewForConfig(cfg) if err != nil { return nil, fmt.Errorf("unable to build kubernetes clientset: %+v", err) } diff --git a/libbeat/processors/add_kubernetes_metadata/config.go b/libbeat/processors/add_kubernetes_metadata/config.go index 50fc6a4361d2..844c809b6f7f 100644 --- a/libbeat/processors/add_kubernetes_metadata/config.go +++ b/libbeat/processors/add_kubernetes_metadata/config.go @@ -21,6 +21,8 @@ import ( "os" "time" + "k8s.io/client-go/rest" + "github.com/elastic/beats/libbeat/common" ) @@ -36,6 +38,7 @@ type kubeAnnotatorConfig struct { Matchers PluginConfig `config:"matchers"` DefaultMatchers Enabled `config:"default_matchers"` DefaultIndexers Enabled `config:"default_indexers"` + InCluster bool } type Enabled struct { @@ -56,6 +59,14 @@ func getSystemKubeConfig() string { return homeKubeConfig } +func inCluster() bool { + _, err := rest.InClusterConfig() + if err == nil { + return true + } + return false +} + func defaultKubernetesAnnotatorConfig() kubeAnnotatorConfig { return kubeAnnotatorConfig{ KubeConfig: getSystemKubeConfig(), @@ -63,5 +74,6 @@ func defaultKubernetesAnnotatorConfig() kubeAnnotatorConfig { CleanupTimeout: 60 * time.Second, DefaultMatchers: Enabled{true}, DefaultIndexers: Enabled{true}, + InCluster: inCluster(), } } diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index 0edde1eb1c18..241e9e43e90c 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -73,6 +73,11 @@ func New(cfg *common.Config) (processors.Processor, error) { return nil, fmt.Errorf("fail to unpack the kubernetes configuration: %s", err) } + // In cluster mode is already detected so ignore any given config + if config.InCluster == true { + config.KubeConfig = "" + } + //Load default indexer configs if config.DefaultIndexers.Enabled == true { Indexing.RLock() From 3c878ef8d71736bd244b056d5b140bab5fc6528c Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 11 Sep 2019 11:21:25 +0300 Subject: [PATCH 06/10] Minor improvements Signed-off-by: chrismark --- .../processors/add_kubernetes_metadata/config.go | 2 +- .../add_kubernetes_metadata/kubernetes.go | 13 ++++++------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/libbeat/processors/add_kubernetes_metadata/config.go b/libbeat/processors/add_kubernetes_metadata/config.go index 844c809b6f7f..b17bd13c01da 100644 --- a/libbeat/processors/add_kubernetes_metadata/config.go +++ b/libbeat/processors/add_kubernetes_metadata/config.go @@ -56,7 +56,7 @@ func getSystemKubeConfig() string { if _, err := os.Stat(homeKubeConfig); !os.IsNotExist(err) { return homeKubeConfig } - return homeKubeConfig + return "" } func inCluster() bool { diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index 241e9e43e90c..f488581633f3 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -57,7 +57,7 @@ func init() { func isKubernetesAvailable(client k8s.Interface) bool { server, err := client.Discovery().ServerVersion() if err != nil { - logp.Debug("kubernetes", "%v: could not detect kubernetes env", "add_kubernetes_metadata") + logp.Debug("kubernetes", "%v: could not detect kubernetes env: %v", "add_kubernetes_metadata", err) return false } logp.Debug("kubernetes", "%v: kubernetes env detected, with version: %v", "add_kubernetes_metadata", server) @@ -73,11 +73,6 @@ func New(cfg *common.Config) (processors.Processor, error) { return nil, fmt.Errorf("fail to unpack the kubernetes configuration: %s", err) } - // In cluster mode is already detected so ignore any given config - if config.InCluster == true { - config.KubeConfig = "" - } - //Load default indexer configs if config.DefaultIndexers.Enabled == true { Indexing.RLock() @@ -117,7 +112,11 @@ func New(cfg *common.Config) (processors.Processor, error) { client, err := kubernetes.GetKubernetesClient(config.KubeConfig) if err != nil { - logp.Debug("kubernetes", "%v: could not create kubernetes client with config: %v", "add_kubernetes_metadata", config.KubeConfig) + if config.InCluster { + logp.Info("%v: could not create kubernetes client using in_cluster config", "add_kubernetes_metadata") + } else { + logp.Info("%v: could not create kubernetes client using config: %v", "add_kubernetes_metadata", config.KubeConfig) + } processor.kubernetesAvailable = false return processor, nil } From fccdf5e05a6b91d6dd3bbf0397f007123785f897 Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 11 Sep 2019 11:52:44 +0300 Subject: [PATCH 07/10] Make kubeconfig path creation OS generic Signed-off-by: chrismark --- libbeat/processors/add_kubernetes_metadata/config.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libbeat/processors/add_kubernetes_metadata/config.go b/libbeat/processors/add_kubernetes_metadata/config.go index b17bd13c01da..06c3ed0c6157 100644 --- a/libbeat/processors/add_kubernetes_metadata/config.go +++ b/libbeat/processors/add_kubernetes_metadata/config.go @@ -19,6 +19,7 @@ package add_kubernetes_metadata import ( "os" + "path/filepath" "time" "k8s.io/client-go/rest" @@ -52,7 +53,7 @@ func getSystemKubeConfig() string { if _, err := os.Stat(envKubeConfig); !os.IsNotExist(err) { return envKubeConfig } - homeKubeConfig := os.Getenv("HOME") + "/.kube/config" + homeKubeConfig := filepath.Join(os.Getenv("HOME"), ".kube", "config") if _, err := os.Stat(homeKubeConfig); !os.IsNotExist(err) { return homeKubeConfig } From 965eff14467d1d0e15afbbe6eeab0ab434e65146 Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 11 Sep 2019 12:33:03 +0300 Subject: [PATCH 08/10] Change logging level Signed-off-by: chrismark --- libbeat/processors/add_kubernetes_metadata/kubernetes.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index f488581633f3..14f0332e0779 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -57,10 +57,10 @@ func init() { func isKubernetesAvailable(client k8s.Interface) bool { server, err := client.Discovery().ServerVersion() if err != nil { - logp.Debug("kubernetes", "%v: could not detect kubernetes env: %v", "add_kubernetes_metadata", err) + logp.Err("%v: could not detect kubernetes env: %v", "add_kubernetes_metadata", err) return false } - logp.Debug("kubernetes", "%v: kubernetes env detected, with version: %v", "add_kubernetes_metadata", server) + logp.Info("%v: kubernetes env detected, with version: %v", "add_kubernetes_metadata", server) return true } @@ -113,9 +113,9 @@ func New(cfg *common.Config) (processors.Processor, error) { client, err := kubernetes.GetKubernetesClient(config.KubeConfig) if err != nil { if config.InCluster { - logp.Info("%v: could not create kubernetes client using in_cluster config", "add_kubernetes_metadata") + logp.Err("%v: could not create kubernetes client using in_cluster config", "add_kubernetes_metadata") } else { - logp.Info("%v: could not create kubernetes client using config: %v", "add_kubernetes_metadata", config.KubeConfig) + logp.Err("%v: could not create kubernetes client using config: %v", "add_kubernetes_metadata", config.KubeConfig) } processor.kubernetesAvailable = false return processor, nil From 442f348d4554e03450293da5532cca933404d1f8 Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 11 Sep 2019 16:03:09 +0300 Subject: [PATCH 09/10] Finalizing autodection checks Signed-off-by: chrismark --- .../add_kubernetes_metadata/config.go | 27 ------------------- .../add_kubernetes_metadata/kubernetes.go | 15 +++++------ .../kubernetes_test.go | 9 ------- 3 files changed, 7 insertions(+), 44 deletions(-) diff --git a/libbeat/processors/add_kubernetes_metadata/config.go b/libbeat/processors/add_kubernetes_metadata/config.go index 06c3ed0c6157..84d2c246948d 100644 --- a/libbeat/processors/add_kubernetes_metadata/config.go +++ b/libbeat/processors/add_kubernetes_metadata/config.go @@ -18,12 +18,8 @@ package add_kubernetes_metadata import ( - "os" - "path/filepath" "time" - "k8s.io/client-go/rest" - "github.com/elastic/beats/libbeat/common" ) @@ -39,7 +35,6 @@ type kubeAnnotatorConfig struct { Matchers PluginConfig `config:"matchers"` DefaultMatchers Enabled `config:"default_matchers"` DefaultIndexers Enabled `config:"default_indexers"` - InCluster bool } type Enabled struct { @@ -48,33 +43,11 @@ type Enabled struct { type PluginConfig []map[string]common.Config -func getSystemKubeConfig() string { - envKubeConfig := os.Getenv("KUBECONFIG") - if _, err := os.Stat(envKubeConfig); !os.IsNotExist(err) { - return envKubeConfig - } - homeKubeConfig := filepath.Join(os.Getenv("HOME"), ".kube", "config") - if _, err := os.Stat(homeKubeConfig); !os.IsNotExist(err) { - return homeKubeConfig - } - return "" -} - -func inCluster() bool { - _, err := rest.InClusterConfig() - if err == nil { - return true - } - return false -} - func defaultKubernetesAnnotatorConfig() kubeAnnotatorConfig { return kubeAnnotatorConfig{ - KubeConfig: getSystemKubeConfig(), SyncPeriod: 10 * time.Minute, CleanupTimeout: 60 * time.Second, DefaultMatchers: Enabled{true}, DefaultIndexers: Enabled{true}, - InCluster: inCluster(), } } diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index 14f0332e0779..70b980cb6119 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -57,7 +57,7 @@ func init() { func isKubernetesAvailable(client k8s.Interface) bool { server, err := client.Discovery().ServerVersion() if err != nil { - logp.Err("%v: could not detect kubernetes env: %v", "add_kubernetes_metadata", err) + logp.Info("%v: could not detect kubernetes env: %v", "add_kubernetes_metadata", err) return false } logp.Info("%v: kubernetes env detected, with version: %v", "add_kubernetes_metadata", server) @@ -105,14 +105,15 @@ func New(cfg *common.Config) (processors.Processor, error) { } processor := &kubernetesAnnotator{ - indexers: indexers, - matchers: matchers, - cache: newCache(config.CleanupTimeout), + indexers: indexers, + matchers: matchers, + cache: newCache(config.CleanupTimeout), + kubernetesAvailable: false, } client, err := kubernetes.GetKubernetesClient(config.KubeConfig) if err != nil { - if config.InCluster { + if kubernetes.IsInCluster(config.KubeConfig) { logp.Err("%v: could not create kubernetes client using in_cluster config", "add_kubernetes_metadata") } else { logp.Err("%v: could not create kubernetes client using config: %v", "add_kubernetes_metadata", config.KubeConfig) @@ -122,12 +123,9 @@ func New(cfg *common.Config) (processors.Processor, error) { } if !isKubernetesAvailable(client) { - processor.kubernetesAvailable = false return processor, nil } - processor.kubernetesAvailable = true - config.Host = kubernetes.DiscoverKubernetesNode(config.Host, kubernetes.IsInCluster(config.KubeConfig), client) logp.Debug("kubernetes", "Using host: %s", config.Host) @@ -144,6 +142,7 @@ func New(cfg *common.Config) (processors.Processor, error) { } processor.watcher = watcher + processor.kubernetesAvailable = true watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes_test.go b/libbeat/processors/add_kubernetes_metadata/kubernetes_test.go index d437bf815f11..4da15db969b4 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes_test.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes_test.go @@ -106,15 +106,6 @@ func TestAnnotatorWithNoKubernetesAvailable(t *testing.T) { kubernetesAvailable: false, } - processor.cache.set("foo", common.MapStr{ - "pod": common.MapStr{ - "labels": common.MapStr{ - "dont": "replace", - "original": "fields", - }, - }, - }) - intialEventMap := common.MapStr{ "kubernetes": common.MapStr{ "pod": common.MapStr{ From 9cc29e54f95a4b99d00be81d3b31e793cfee5d09 Mon Sep 17 00:00:00 2001 From: chrismark Date: Thu, 12 Sep 2019 09:58:02 +0300 Subject: [PATCH 10/10] Minor improvements Signed-off-by: chrismark --- CHANGELOG.next.asciidoc | 2 +- libbeat/common/kubernetes/util.go | 1 - .../processors/add_kubernetes_metadata/kubernetes.go | 11 +++++------ 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 95b4922bd83e..cbf55f4bb9c2 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -248,7 +248,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add support for RFC3339 time zone offsets in JSON output. {pull}13227[13227] - Add autodetection mode for add_docker_metadata and enable it by default in included configuration files{pull}13374[13374] - Added `monitoring.cluster_uuid` setting to associate Beat data with specified ES cluster in Stack Monitoring UI. {pull}13182[13182] -- Add autodetection mode for add_kubernetes_metadata and enable it by default in included configuration files{pull}13473[13473] +- Add autodetection mode for add_kubernetes_metadata and enable it by default in included configuration files. {pull}13473[13473] *Auditbeat* diff --git a/libbeat/common/kubernetes/util.go b/libbeat/common/kubernetes/util.go index 871f304b7e42..30385e4c76c5 100644 --- a/libbeat/common/kubernetes/util.go +++ b/libbeat/common/kubernetes/util.go @@ -36,7 +36,6 @@ const defaultNode = "localhost" // in cluster configuration based on the secrets mounted in the Pod. If kubeConfig is passed, // it parses the config file to get the config required to build a client. func GetKubernetesClient(kubeconfig string) (kubernetes.Interface, error) { - cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig) if err != nil { return nil, fmt.Errorf("unable to build kube config due to error: %+v", err) diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index 70b980cb6119..c9461c35bf99 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -21,13 +21,13 @@ import ( "fmt" "time" + k8sclient "k8s.io/client-go/kubernetes" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/kubernetes" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/processors" - - k8s "k8s.io/client-go/kubernetes" ) const ( @@ -54,7 +54,7 @@ func init() { Indexing.AddMatcher(FieldFormatMatcherName, NewFieldFormatMatcher) } -func isKubernetesAvailable(client k8s.Interface) bool { +func isKubernetesAvailable(client k8sclient.Interface) bool { server, err := client.Discovery().ServerVersion() if err != nil { logp.Info("%v: could not detect kubernetes env: %v", "add_kubernetes_metadata", err) @@ -114,11 +114,10 @@ func New(cfg *common.Config) (processors.Processor, error) { client, err := kubernetes.GetKubernetesClient(config.KubeConfig) if err != nil { if kubernetes.IsInCluster(config.KubeConfig) { - logp.Err("%v: could not create kubernetes client using in_cluster config", "add_kubernetes_metadata") + logp.Debug("kubernetes", "%v: could not create kubernetes client using in_cluster config", "add_kubernetes_metadata") } else { - logp.Err("%v: could not create kubernetes client using config: %v", "add_kubernetes_metadata", config.KubeConfig) + logp.Debug("kubernetes", "%v: could not create kubernetes client using config: %v", "add_kubernetes_metadata", config.KubeConfig) } - processor.kubernetesAvailable = false return processor, nil }