Skip to content

Commit

Permalink
Promtail: adding pipeline stage for dropping labels (#2571)
Browse files Browse the repository at this point in the history
* adding pipeline stage for dropping labels, our use case for this is to move unbound labels into the log line and dropping the label, fixes #2421

Signed-off-by: Roger Steneteg <rsteneteg@ea.com>

* Update docs/sources/clients/promtail/stages/labeldrop.md

Co-authored-by: Diana Payton <52059945+oddlittlebird@users.noreply.github.com>

* Update docs/sources/clients/promtail/stages/_index.md

Co-authored-by: Diana Payton <52059945+oddlittlebird@users.noreply.github.com>

* fixed inconsistent naming, removed unused logger in the labeldrop stage

Signed-off-by: Roger Steneteg <rsteneteg@ea.com>

Co-authored-by: Diana Payton <52059945+oddlittlebird@users.noreply.github.com>
  • Loading branch information
rsteneteg and oddlittlebird authored Sep 10, 2020
1 parent c2e610b commit edb5fc5
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 1 deletion.
2 changes: 1 addition & 1 deletion docs/sources/clients/promtail/stages/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Action stages:

* [timestamp](timestamp/): Set the timestamp value for the log entry.
* [output](output/): Set the log line text.
* [labeldrop](labeldrop/): Drop label set for the log entry.
* [labels](labels/): Update the label set for the log entry.
* [metrics](metrics/): Calculate metrics based on extracted data.
* [tenant](tenant/): Set the tenant ID value to use for the log entry.
Expand All @@ -30,4 +31,3 @@ Filtering stages:

* [match](match/): Conditionally run stages based on the label set.
* [drop](drop/): Conditionally drop log lines based on several options.

36 changes: 36 additions & 0 deletions docs/sources/clients/promtail/stages/labeldrop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
---
title: labeldrop
---
# `labeldrop` stage

The labeldrop stage is an action stage that takes drops labels from
the label set that is sent to Loki with the log entry.

## Schema

```yaml
labeldrop:
- [<string>]
...
```

### Examples

For the given pipeline:

```yaml
- replace:
expression: "(.*)"
replace: "pod_name:{{ .kubernetes_pod_name }} {{ .Value }}"
- labeldrop:
- kubernetes_pod_name
```
Given the following log line:
```
log message\n
```

The first stage would append the value of the`kubernetes_pod_name` label into the beginning of the log line.
The labeldrop stage would drop the label from being sent to Loki, and it would now be part of the log line instead.
58 changes: 58 additions & 0 deletions pkg/logentry/stages/labeldrop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package stages

import (
"time"

"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
)

const (
// ErrEmptyLabelDropStageConfig error returned if config is empty
ErrEmptyLabelDropStageConfig = "labeldrop stage config cannot be empty"
)

// LabelDropConfig is a slice of labels to be dropped
type LabelDropConfig []string

func validateLabelDropConfig(c LabelDropConfig) error {
if c == nil || len(c) < 1 {
return errors.New(ErrEmptyLabelDropStageConfig)
}

return nil
}

func newLabelDropStage(configs interface{}) (*labelDropStage, error) {
cfgs := &LabelDropConfig{}
err := mapstructure.Decode(configs, cfgs)
if err != nil {
return nil, err
}

err = validateLabelDropConfig(*cfgs)
if err != nil {
return nil, err
}

return &labelDropStage{
cfgs: *cfgs,
}, nil
}

type labelDropStage struct {
cfgs LabelDropConfig
}

// Process implements Stage
func (l *labelDropStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
for _, label := range l.cfgs {
delete(labels, model.LabelName(label))
}
}

// Name implements Stage
func (l *labelDropStage) Name() string {
return StageTypeLabelDrop
}
69 changes: 69 additions & 0 deletions pkg/logentry/stages/labeldrop_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package stages

import (
"testing"

"github.com/cortexproject/cortex/pkg/util"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
ww "github.com/weaveworks/common/server"
)

func Test_dropLabelStage_Process(t *testing.T) {
// Enable debug logging
cfg := &ww.Config{}
cfg.LogLevel.Set("debug")
util.InitLogger(cfg)
Debug = true

tests := []struct {
name string
config *LabelDropConfig
inputLabels model.LabelSet
expectedLabels model.LabelSet
}{
{
name: "drop one label",
config: &LabelDropConfig{"testLabel1"},
inputLabels: model.LabelSet{
"testLabel1": "testValue",
"testLabel2": "testValue",
},
expectedLabels: model.LabelSet{
"testLabel2": "testValue",
},
},
{
name: "drop two labels",
config: &LabelDropConfig{"testLabel1", "testLabel2"},
inputLabels: model.LabelSet{
"testLabel1": "testValue",
"testLabel2": "testValue",
},
expectedLabels: model.LabelSet{},
},
{
name: "drop non-existing label",
config: &LabelDropConfig{"foobar"},
inputLabels: model.LabelSet{
"testLabel1": "testValue",
"testLabel2": "testValue",
},
expectedLabels: model.LabelSet{
"testLabel1": "testValue",
"testLabel2": "testValue",
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
st, err := newLabelDropStage(test.config)
if err != nil {
t.Fatal(err)
}
st.Process(test.inputLabels, map[string]interface{}{}, nil, nil)
assert.Equal(t, test.expectedLabels, test.inputLabels)
})
}
}
6 changes: 6 additions & 0 deletions pkg/logentry/stages/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (
StageTypeReplace = "replace"
StageTypeMetric = "metrics"
StageTypeLabel = "labels"
StageTypeLabelDrop = "labeldrop"
StageTypeTimestamp = "timestamp"
StageTypeOutput = "output"
StageTypeDocker = "docker"
Expand Down Expand Up @@ -77,6 +78,11 @@ func New(logger log.Logger, jobName *string, stageType string,
if err != nil {
return nil, err
}
case StageTypeLabelDrop:
s, err = newLabelDropStage(cfg)
if err != nil {
return nil, err
}
case StageTypeTimestamp:
s, err = newTimestampStage(logger, cfg)
if err != nil {
Expand Down

0 comments on commit edb5fc5

Please sign in to comment.