From edb5fc51762b1f73632eec10933fe045be77d96d Mon Sep 17 00:00:00 2001 From: Roger Steneteg <36709673+rsteneteg@users.noreply.github.com> Date: Thu, 10 Sep 2020 08:42:58 -0500 Subject: [PATCH] Promtail: adding pipeline stage for dropping labels (#2571) * adding pipeline stage for dropping labels, our use case for this is to move unbound labels into the log line and dropping the label, fixes #2421 Signed-off-by: Roger Steneteg * Update docs/sources/clients/promtail/stages/labeldrop.md Co-authored-by: Diana Payton <52059945+oddlittlebird@users.noreply.github.com> * Update docs/sources/clients/promtail/stages/_index.md Co-authored-by: Diana Payton <52059945+oddlittlebird@users.noreply.github.com> * fixed inconsistent naming, removed unused logger in the labeldrop stage Signed-off-by: Roger Steneteg Co-authored-by: Diana Payton <52059945+oddlittlebird@users.noreply.github.com> --- .../sources/clients/promtail/stages/_index.md | 2 +- .../clients/promtail/stages/labeldrop.md | 36 ++++++++++ pkg/logentry/stages/labeldrop.go | 58 ++++++++++++++++ pkg/logentry/stages/labeldrop_test.go | 69 +++++++++++++++++++ pkg/logentry/stages/stage.go | 6 ++ 5 files changed, 170 insertions(+), 1 deletion(-) create mode 100644 docs/sources/clients/promtail/stages/labeldrop.md create mode 100644 pkg/logentry/stages/labeldrop.go create mode 100644 pkg/logentry/stages/labeldrop_test.go diff --git a/docs/sources/clients/promtail/stages/_index.md b/docs/sources/clients/promtail/stages/_index.md index 76bed1c43c434..fa59e75b425e1 100644 --- a/docs/sources/clients/promtail/stages/_index.md +++ b/docs/sources/clients/promtail/stages/_index.md @@ -22,6 +22,7 @@ Action stages: * [timestamp](timestamp/): Set the timestamp value for the log entry. * [output](output/): Set the log line text. + * [labeldrop](labeldrop/): Drop label set for the log entry. * [labels](labels/): Update the label set for the log entry. * [metrics](metrics/): Calculate metrics based on extracted data. * [tenant](tenant/): Set the tenant ID value to use for the log entry. @@ -30,4 +31,3 @@ Filtering stages: * [match](match/): Conditionally run stages based on the label set. * [drop](drop/): Conditionally drop log lines based on several options. - diff --git a/docs/sources/clients/promtail/stages/labeldrop.md b/docs/sources/clients/promtail/stages/labeldrop.md new file mode 100644 index 0000000000000..6559fb558c7ea --- /dev/null +++ b/docs/sources/clients/promtail/stages/labeldrop.md @@ -0,0 +1,36 @@ +--- +title: labeldrop +--- +# `labeldrop` stage + +The labeldrop stage is an action stage that takes drops labels from +the label set that is sent to Loki with the log entry. + +## Schema + +```yaml +labeldrop: + - [] + ... +``` + +### Examples + +For the given pipeline: + +```yaml +- replace: + expression: "(.*)" + replace: "pod_name:{{ .kubernetes_pod_name }} {{ .Value }}" +- labeldrop: + - kubernetes_pod_name +``` + +Given the following log line: + +``` +log message\n +``` + +The first stage would append the value of the`kubernetes_pod_name` label into the beginning of the log line. +The labeldrop stage would drop the label from being sent to Loki, and it would now be part of the log line instead. diff --git a/pkg/logentry/stages/labeldrop.go b/pkg/logentry/stages/labeldrop.go new file mode 100644 index 0000000000000..c04c57feb7bb4 --- /dev/null +++ b/pkg/logentry/stages/labeldrop.go @@ -0,0 +1,58 @@ +package stages + +import ( + "time" + + "github.com/mitchellh/mapstructure" + "github.com/pkg/errors" + "github.com/prometheus/common/model" +) + +const ( + // ErrEmptyLabelDropStageConfig error returned if config is empty + ErrEmptyLabelDropStageConfig = "labeldrop stage config cannot be empty" +) + +// LabelDropConfig is a slice of labels to be dropped +type LabelDropConfig []string + +func validateLabelDropConfig(c LabelDropConfig) error { + if c == nil || len(c) < 1 { + return errors.New(ErrEmptyLabelDropStageConfig) + } + + return nil +} + +func newLabelDropStage(configs interface{}) (*labelDropStage, error) { + cfgs := &LabelDropConfig{} + err := mapstructure.Decode(configs, cfgs) + if err != nil { + return nil, err + } + + err = validateLabelDropConfig(*cfgs) + if err != nil { + return nil, err + } + + return &labelDropStage{ + cfgs: *cfgs, + }, nil +} + +type labelDropStage struct { + cfgs LabelDropConfig +} + +// Process implements Stage +func (l *labelDropStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) { + for _, label := range l.cfgs { + delete(labels, model.LabelName(label)) + } +} + +// Name implements Stage +func (l *labelDropStage) Name() string { + return StageTypeLabelDrop +} diff --git a/pkg/logentry/stages/labeldrop_test.go b/pkg/logentry/stages/labeldrop_test.go new file mode 100644 index 0000000000000..46765cd32a6c1 --- /dev/null +++ b/pkg/logentry/stages/labeldrop_test.go @@ -0,0 +1,69 @@ +package stages + +import ( + "testing" + + "github.com/cortexproject/cortex/pkg/util" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" + ww "github.com/weaveworks/common/server" +) + +func Test_dropLabelStage_Process(t *testing.T) { + // Enable debug logging + cfg := &ww.Config{} + cfg.LogLevel.Set("debug") + util.InitLogger(cfg) + Debug = true + + tests := []struct { + name string + config *LabelDropConfig + inputLabels model.LabelSet + expectedLabels model.LabelSet + }{ + { + name: "drop one label", + config: &LabelDropConfig{"testLabel1"}, + inputLabels: model.LabelSet{ + "testLabel1": "testValue", + "testLabel2": "testValue", + }, + expectedLabels: model.LabelSet{ + "testLabel2": "testValue", + }, + }, + { + name: "drop two labels", + config: &LabelDropConfig{"testLabel1", "testLabel2"}, + inputLabels: model.LabelSet{ + "testLabel1": "testValue", + "testLabel2": "testValue", + }, + expectedLabels: model.LabelSet{}, + }, + { + name: "drop non-existing label", + config: &LabelDropConfig{"foobar"}, + inputLabels: model.LabelSet{ + "testLabel1": "testValue", + "testLabel2": "testValue", + }, + expectedLabels: model.LabelSet{ + "testLabel1": "testValue", + "testLabel2": "testValue", + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + st, err := newLabelDropStage(test.config) + if err != nil { + t.Fatal(err) + } + st.Process(test.inputLabels, map[string]interface{}{}, nil, nil) + assert.Equal(t, test.expectedLabels, test.inputLabels) + }) + } +} diff --git a/pkg/logentry/stages/stage.go b/pkg/logentry/stages/stage.go index df770aea0f680..144e006ee14f7 100644 --- a/pkg/logentry/stages/stage.go +++ b/pkg/logentry/stages/stage.go @@ -15,6 +15,7 @@ const ( StageTypeReplace = "replace" StageTypeMetric = "metrics" StageTypeLabel = "labels" + StageTypeLabelDrop = "labeldrop" StageTypeTimestamp = "timestamp" StageTypeOutput = "output" StageTypeDocker = "docker" @@ -77,6 +78,11 @@ func New(logger log.Logger, jobName *string, stageType string, if err != nil { return nil, err } + case StageTypeLabelDrop: + s, err = newLabelDropStage(cfg) + if err != nil { + return nil, err + } case StageTypeTimestamp: s, err = newTimestampStage(logger, cfg) if err != nil {