Skip to content

Commit

Permalink
Add _extracted to non-indexed labels with same name as stream labels (
Browse files Browse the repository at this point in the history
#10082)

**What this PR does / why we need it**:

In #9702 we added support for
returning non-indexed labels in the labels results. The problem is that
non-indexed labels may overwrite stream labels if both are named the
same way. This PR fixes this by adding an `_extracted` suffix if the
non-indexed label is already present in the stream labels.

**Special notes for your reviewer**:

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [x] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e)
  • Loading branch information
salvacorts authored Jul 28, 2023
1 parent 60e33e3 commit cbb272d
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 1 deletion.
9 changes: 8 additions & 1 deletion pkg/logql/log/labels.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package log

import (
"fmt"
"sort"

"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -244,9 +245,15 @@ func (b *LabelsBuilder) Set(n, v string) *LabelsBuilder {
return b
}

// Add the labels to the builder. If a label with the same name
// already exists in the base labels, a suffix is added to the name.
func (b *LabelsBuilder) Add(labels ...labels.Label) *LabelsBuilder {
for _, l := range labels {
b.Set(l.Name, l.Value)
name := l.Name
if b.BaseHas(name) {
name = fmt.Sprintf("%s%s", name, duplicateSuffix)
}
b.Set(name, l.Value)
}
return b
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/logql/log/metrics_extraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,25 @@ func TestNewLineSampleExtractorWithNonIndexedLabels(t *testing.T) {
require.Equal(t, 1., f)
assertLabelResult(t, expectedLabelsResults, l)

// test duplicated non-indexed labels with stream labels
expectedLabelsResults = append(lbs, labels.Label{
Name: "foo_extracted", Value: "baz",
})
expectedLabelsResults = append(expectedLabelsResults, nonIndexedLabels...)
f, l, ok = sse.Process(0, []byte(`foo`), append(nonIndexedLabels, labels.Label{
Name: "foo", Value: "baz",
})...)
require.True(t, ok)
require.Equal(t, 1., f)
assertLabelResult(t, expectedLabelsResults, l)

f, l, ok = sse.ProcessString(0, `foo`, append(nonIndexedLabels, labels.Label{
Name: "foo", Value: "baz",
})...)
require.True(t, ok)
require.Equal(t, 1., f)
assertLabelResult(t, expectedLabelsResults, l)

se, err = NewLineSampleExtractor(BytesExtractor, []Stage{
NewStringLabelFilter(labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")),
NewStringLabelFilter(labels.MustNewMatcher(labels.MatchEqual, "user", "bob")),
Expand Down
31 changes: 31 additions & 0 deletions pkg/logql/log/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ func TestNoopPipeline(t *testing.T) {
require.Equal(t, NewLabelsResult(expectedLabelsResults, expectedLabelsResults.Hash()), lbr)
require.Equal(t, true, matches)

// test duplicated non-indexed labels with stream labels
expectedLabelsResults = append(lbs, labels.Label{
Name: "foo_extracted", Value: "baz",
})
expectedLabelsResults = append(expectedLabelsResults, nonIndexedLabels...)
l, lbr, matches = pipeline.ForStream(lbs).Process(0, []byte(""), append(nonIndexedLabels, labels.Label{
Name: "foo", Value: "baz",
})...)
require.Equal(t, []byte(""), l)
require.Equal(t, NewLabelsResult(expectedLabelsResults, expectedLabelsResults.Hash()), lbr)
require.Equal(t, true, matches)

pipeline.Reset()
require.Len(t, pipeline.cache, 0)
}
Expand Down Expand Up @@ -100,6 +112,25 @@ func TestPipelineWithNonIndexedLabels(t *testing.T) {
require.Equal(t, NewLabelsResult(expectedLabelsResults, expectedLabelsResults.Hash()), lbr)
require.Equal(t, true, matches)

// test duplicated non-indexed labels with stream labels
expectedLabelsResults = append(lbs, labels.Label{
Name: "foo_extracted", Value: "baz",
})
expectedLabelsResults = append(expectedLabelsResults, nonIndexedLabels...)
l, lbr, matches = p.ForStream(lbs).Process(0, []byte("line"), append(nonIndexedLabels, labels.Label{
Name: "foo", Value: "baz",
})...)
require.Equal(t, []byte("lbs bar bob"), l)
require.Equal(t, NewLabelsResult(expectedLabelsResults, expectedLabelsResults.Hash()), lbr)
require.Equal(t, true, matches)

ls, lbr, matches = p.ForStream(lbs).ProcessString(0, "line", append(nonIndexedLabels, labels.Label{
Name: "foo", Value: "baz",
})...)
require.Equal(t, "lbs bar bob", ls)
require.Equal(t, NewLabelsResult(expectedLabelsResults, expectedLabelsResults.Hash()), lbr)
require.Equal(t, true, matches)

l, lbr, matches = p.ForStream(lbs).Process(0, []byte("line"))
require.Equal(t, []byte(nil), l)
require.Equal(t, nil, lbr)
Expand Down

0 comments on commit cbb272d

Please sign in to comment.