From 30ec4eba79ec8021d0ce98d4ca811dedba83354d Mon Sep 17 00:00:00 2001 From: allanhung Date: Thu, 28 Nov 2019 03:07:12 +0800 Subject: [PATCH] fluent-bit-plugin: Auto add Kubernetes labels to Loki labels (#1204) * auto parser kubernetes metadata when lineformat set kubernetes * fix format * fix test * refactor * fix typo, use switch instead of if and go fmt * fix switch --- cmd/fluent-bit/README.md | 15 +++++++++++++++ cmd/fluent-bit/config.go | 25 ++++++++++++++++++------- cmd/fluent-bit/loki.go | 34 +++++++++++++++++++++++++++++++++- cmd/fluent-bit/out_loki.go | 1 + 4 files changed, 67 insertions(+), 8 deletions(-) diff --git a/cmd/fluent-bit/README.md b/cmd/fluent-bit/README.md index bfb32689802d8..0ff9f2c5a1122 100644 --- a/cmd/fluent-bit/README.md +++ b/cmd/fluent-bit/README.md @@ -17,6 +17,7 @@ This plugin is implemented with [Fluent Bit's Go plugin](https://github.com/flue | Labels | labels for API requests. | {job="fluent-bit"} | | LogLevel | LogLevel for plugin logger. | "info" | | RemoveKeys | Specify removing keys. | none | +| AutoKubernetesLabels | If set to true, it will add all Kubernetes labels to Loki labels | false | | LabelKeys | Comma separated list of keys to use as stream labels. All other keys will be placed into the log line. LabelKeys is deactivated when using `LabelMapPath` label mapping configuration. | none | | LineFormat | Format to use when flattening the record to a log line. Valid values are "json" or "key_value". If set to "json" the log line sent to Loki will be the fluentd record (excluding any keys extracted out as labels) dumped as json. If set to "key_value", the log line will be each item in the record concatenated together (separated by a single space) in the format =. | json | | DropSingleKey | If set to true and after extracting label_keys a record only has a single key remaining, the log line sent to Loki will just be the value of the record key.| true | @@ -28,6 +29,10 @@ Labels are used to [query logs](../../docs/logql.md) `{container_name="nginx", c You can use `Labels`, `RemoveKeys` , `LabelKeys` and `LabelMapPath` to how the output plugin will perform labels extraction. +### AutoKubernetesLabels + +If set to true, it will add all Kubernetes labels to Loki labels automatically and ignore paramaters `LabelKeys`, LabelMapPath. + ### LabelMapPath When using the `Parser` and `Filter` plugins Fluent Bit can extract and add data to the current record/log data. While Loki labels are key value pair, record data can be nested structures. @@ -89,6 +94,16 @@ To configure the Loki output plugin add this section to fluent-bit.conf LineFormat key_value ``` +```properties +[Output] + Name loki + Match * + Url http://localhost:3100/loki/api/v1/push + BatchWait 1 # (1sec) + BatchSize 30720 # (30KiB) + AutoKubernetesLabels true + RemoveKeys key1,key2 +``` A full [example configuration file](fluent-bit.conf) is also available in this repository. ## Building diff --git a/cmd/fluent-bit/config.go b/cmd/fluent-bit/config.go index 6d37d8232123e..11c9d1be34450 100644 --- a/cmd/fluent-bit/config.go +++ b/cmd/fluent-bit/config.go @@ -36,13 +36,14 @@ const ( ) type config struct { - clientConfig client.Config - logLevel logging.Level - removeKeys []string - labelKeys []string - lineFormat format - dropSingleKey bool - labelMap map[string]interface{} + clientConfig client.Config + logLevel logging.Level + autoKubernetesLabels bool + removeKeys []string + labelKeys []string + lineFormat format + dropSingleKey bool + labelMap map[string]interface{} } func parseConfig(cfg ConfigGetter) (*config, error) { @@ -106,6 +107,16 @@ func parseConfig(cfg ConfigGetter) (*config, error) { } res.logLevel = level + autoKubernetesLabels := cfg.Get("AutoKubernetesLabels") + switch autoKubernetesLabels { + case "false", "": + res.autoKubernetesLabels = false + case "true": + res.autoKubernetesLabels = true + default: + return nil, fmt.Errorf("invalid boolean AutoKubernetesLabels: %v", autoKubernetesLabels) + } + removeKey := cfg.Get("RemoveKeys") if removeKey != "" { res.removeKeys = strings.Split(removeKey, ",") diff --git a/cmd/fluent-bit/loki.go b/cmd/fluent-bit/loki.go index 254b6985662d4..f3cab287ec61b 100644 --- a/cmd/fluent-bit/loki.go +++ b/cmd/fluent-bit/loki.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "sort" + "strings" "time" "github.com/go-kit/kit/log" @@ -38,7 +39,9 @@ func (l *loki) sendRecord(r map[interface{}]interface{}, ts time.Time) error { records := toStringMap(r) level.Debug(l.logger).Log("msg", "processing records", "records", fmt.Sprintf("%+v", records)) lbs := model.LabelSet{} - if l.cfg.labelMap != nil { + if l.cfg.autoKubernetesLabels { + lbs = autoLabels(records) + } else if l.cfg.labelMap != nil { mapLabels(records, l.cfg.labelMap, lbs) } else { lbs = extractLabels(records, l.cfg.labelKeys) @@ -81,6 +84,35 @@ func toStringMap(record map[interface{}]interface{}) map[string]interface{} { return m } +func autoLabels(records map[string]interface{}) model.LabelSet { + kuberneteslbs := model.LabelSet{} + replacer := strings.NewReplacer("/", "_", ".", "_", "-", "_") + for k, v := range records["kubernetes"].(map[interface{}]interface{}) { + switch key := k.(string); key { + case "labels": + for m, n := range v.(map[interface{}]interface{}) { + switch t := n.(type) { + case []byte: + kuberneteslbs[model.LabelName(replacer.Replace(m.(string)))] = model.LabelValue(string(t)) + default: + kuberneteslbs[model.LabelName(replacer.Replace(m.(string)))] = model.LabelValue(fmt.Sprintf("%v", n)) + } + } + case "docker_id", "pod_id", "annotations": + // do nothing + continue + default: + switch t := v.(type) { + case []byte: + kuberneteslbs[model.LabelName(k.(string))] = model.LabelValue(string(t)) + default: + kuberneteslbs[model.LabelName(k.(string))] = model.LabelValue(fmt.Sprintf("%v", v)) + } + } + } + return kuberneteslbs +} + func extractLabels(records map[string]interface{}, keys []string) model.LabelSet { res := model.LabelSet{} for _, k := range keys { diff --git a/cmd/fluent-bit/out_loki.go b/cmd/fluent-bit/out_loki.go index 7f19daf50c5b6..638af0d891fb9 100644 --- a/cmd/fluent-bit/out_loki.go +++ b/cmd/fluent-bit/out_loki.go @@ -53,6 +53,7 @@ func FLBPluginInit(ctx unsafe.Pointer) int { level.Info(logger).Log("[flb-go]", "provided parameter", "BatchSize", conf.clientConfig.BatchSize) level.Info(logger).Log("[flb-go]", "provided parameter", "Labels", conf.clientConfig.ExternalLabels) level.Info(logger).Log("[flb-go]", "provided parameter", "LogLevel", conf.logLevel) + level.Info(logger).Log("[flb-go]", "provided parameter", "AutoKubernetesLabels", conf.autoKubernetesLabels) level.Info(logger).Log("[flb-go]", "provided parameter", "RemoveKeys", fmt.Sprintf("%+v", conf.removeKeys)) level.Info(logger).Log("[flb-go]", "provided parameter", "LabelKeys", fmt.Sprintf("%+v", conf.labelKeys)) level.Info(logger).Log("[flb-go]", "provided parameter", "LineFormat", conf.lineFormat)