Skip to content

Commit

Permalink
fluent-bit-plugin: Auto add Kubernetes labels to Loki labels (#1204)
Browse files Browse the repository at this point in the history
* auto parser kubernetes metadata when lineformat set kubernetes

* fix format

* fix test

* refactor

* fix typo, use switch instead of if and go fmt

* fix switch
  • Loading branch information
allanhung authored and cyriltovena committed Nov 27, 2019
1 parent 4202ced commit 30ec4eb
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 8 deletions.
15 changes: 15 additions & 0 deletions cmd/fluent-bit/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ This plugin is implemented with [Fluent Bit's Go plugin](https://github.com/flue
| Labels | labels for API requests. | {job="fluent-bit"} |
| LogLevel | LogLevel for plugin logger. | "info" |
| RemoveKeys | Specify removing keys. | none |
| AutoKubernetesLabels | If set to true, it will add all Kubernetes labels to Loki labels | false |
| LabelKeys | Comma separated list of keys to use as stream labels. All other keys will be placed into the log line. LabelKeys is deactivated when using `LabelMapPath` label mapping configuration. | none |
| LineFormat | Format to use when flattening the record to a log line. Valid values are "json" or "key_value". If set to "json" the log line sent to Loki will be the fluentd record (excluding any keys extracted out as labels) dumped as json. If set to "key_value", the log line will be each item in the record concatenated together (separated by a single space) in the format <key>=<value>. | json |
| DropSingleKey | If set to true and after extracting label_keys a record only has a single key remaining, the log line sent to Loki will just be the value of the record key.| true |
Expand All @@ -28,6 +29,10 @@ Labels are used to [query logs](../../docs/logql.md) `{container_name="nginx", c

You can use `Labels`, `RemoveKeys` , `LabelKeys` and `LabelMapPath` to how the output plugin will perform labels extraction.

### AutoKubernetesLabels

If set to true, it will add all Kubernetes labels to Loki labels automatically and ignore paramaters `LabelKeys`, LabelMapPath.

### LabelMapPath

When using the `Parser` and `Filter` plugins Fluent Bit can extract and add data to the current record/log data. While Loki labels are key value pair, record data can be nested structures.
Expand Down Expand Up @@ -89,6 +94,16 @@ To configure the Loki output plugin add this section to fluent-bit.conf
LineFormat key_value
```

```properties
[Output]
Name loki
Match *
Url http://localhost:3100/loki/api/v1/push
BatchWait 1 # (1sec)
BatchSize 30720 # (30KiB)
AutoKubernetesLabels true
RemoveKeys key1,key2
```
A full [example configuration file](fluent-bit.conf) is also available in this repository.

## Building
Expand Down
25 changes: 18 additions & 7 deletions cmd/fluent-bit/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ const (
)

type config struct {
clientConfig client.Config
logLevel logging.Level
removeKeys []string
labelKeys []string
lineFormat format
dropSingleKey bool
labelMap map[string]interface{}
clientConfig client.Config
logLevel logging.Level
autoKubernetesLabels bool
removeKeys []string
labelKeys []string
lineFormat format
dropSingleKey bool
labelMap map[string]interface{}
}

func parseConfig(cfg ConfigGetter) (*config, error) {
Expand Down Expand Up @@ -106,6 +107,16 @@ func parseConfig(cfg ConfigGetter) (*config, error) {
}
res.logLevel = level

autoKubernetesLabels := cfg.Get("AutoKubernetesLabels")
switch autoKubernetesLabels {
case "false", "":
res.autoKubernetesLabels = false
case "true":
res.autoKubernetesLabels = true
default:
return nil, fmt.Errorf("invalid boolean AutoKubernetesLabels: %v", autoKubernetesLabels)
}

removeKey := cfg.Get("RemoveKeys")
if removeKey != "" {
res.removeKeys = strings.Split(removeKey, ",")
Expand Down
34 changes: 33 additions & 1 deletion cmd/fluent-bit/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"sort"
"strings"
"time"

"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -38,7 +39,9 @@ func (l *loki) sendRecord(r map[interface{}]interface{}, ts time.Time) error {
records := toStringMap(r)
level.Debug(l.logger).Log("msg", "processing records", "records", fmt.Sprintf("%+v", records))
lbs := model.LabelSet{}
if l.cfg.labelMap != nil {
if l.cfg.autoKubernetesLabels {
lbs = autoLabels(records)
} else if l.cfg.labelMap != nil {
mapLabels(records, l.cfg.labelMap, lbs)
} else {
lbs = extractLabels(records, l.cfg.labelKeys)
Expand Down Expand Up @@ -81,6 +84,35 @@ func toStringMap(record map[interface{}]interface{}) map[string]interface{} {
return m
}

func autoLabels(records map[string]interface{}) model.LabelSet {
kuberneteslbs := model.LabelSet{}
replacer := strings.NewReplacer("/", "_", ".", "_", "-", "_")
for k, v := range records["kubernetes"].(map[interface{}]interface{}) {
switch key := k.(string); key {
case "labels":
for m, n := range v.(map[interface{}]interface{}) {
switch t := n.(type) {
case []byte:
kuberneteslbs[model.LabelName(replacer.Replace(m.(string)))] = model.LabelValue(string(t))
default:
kuberneteslbs[model.LabelName(replacer.Replace(m.(string)))] = model.LabelValue(fmt.Sprintf("%v", n))
}
}
case "docker_id", "pod_id", "annotations":
// do nothing
continue
default:
switch t := v.(type) {
case []byte:
kuberneteslbs[model.LabelName(k.(string))] = model.LabelValue(string(t))
default:
kuberneteslbs[model.LabelName(k.(string))] = model.LabelValue(fmt.Sprintf("%v", v))
}
}
}
return kuberneteslbs
}

func extractLabels(records map[string]interface{}, keys []string) model.LabelSet {
res := model.LabelSet{}
for _, k := range keys {
Expand Down
1 change: 1 addition & 0 deletions cmd/fluent-bit/out_loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func FLBPluginInit(ctx unsafe.Pointer) int {
level.Info(logger).Log("[flb-go]", "provided parameter", "BatchSize", conf.clientConfig.BatchSize)
level.Info(logger).Log("[flb-go]", "provided parameter", "Labels", conf.clientConfig.ExternalLabels)
level.Info(logger).Log("[flb-go]", "provided parameter", "LogLevel", conf.logLevel)
level.Info(logger).Log("[flb-go]", "provided parameter", "AutoKubernetesLabels", conf.autoKubernetesLabels)
level.Info(logger).Log("[flb-go]", "provided parameter", "RemoveKeys", fmt.Sprintf("%+v", conf.removeKeys))
level.Info(logger).Log("[flb-go]", "provided parameter", "LabelKeys", fmt.Sprintf("%+v", conf.labelKeys))
level.Info(logger).Log("[flb-go]", "provided parameter", "LineFormat", conf.lineFormat)
Expand Down

0 comments on commit 30ec4eb

Please sign in to comment.