Skip to content

Commit

Permalink
Improve labels computation during LogQL pipeline (#6110)
Browse files Browse the repository at this point in the history
* Improve labels computation during LogQL pipeline

* Fixes a bug that was mutating the base labels
  • Loading branch information
cyriltovena authored May 24, 2022
1 parent 0dfd2cb commit 67e7285
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 48 deletions.
4 changes: 2 additions & 2 deletions pkg/logql/log/fmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (lf *LineFormatter) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool)
lf.buf.Reset()
lf.currentLine = line

if err := lf.Template.Execute(lf.buf, lbs.Labels().Map()); err != nil {
if err := lf.Template.Execute(lf.buf, lbs.Map()); err != nil {
lbs.SetErr(errTemplateFormat)
return line, true
}
Expand Down Expand Up @@ -287,7 +287,7 @@ func (lf *LabelsFormatter) Process(l []byte, lbs *LabelsBuilder) ([]byte, bool)
}
lf.buf.Reset()
if data == nil {
data = lbs.Labels().Map()
data = lbs.Map()
}
if err := f.tmpl.Execute(lf.buf, data); err != nil {
lbs.SetErr(errTemplateFormat)
Expand Down
4 changes: 2 additions & 2 deletions pkg/logql/log/fmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func Test_lineFormatter_Format(t *testing.T) {
builder.Reset()
outLine, _ := tt.fmter.Process(tt.in, builder)
require.Equal(t, tt.want, outLine)
require.Equal(t, tt.wantLbs, builder.Labels())
require.Equal(t, tt.wantLbs, builder.LabelsResult().Labels())
})
}
}
Expand Down Expand Up @@ -349,7 +349,7 @@ func Test_labelsFormatter_Format(t *testing.T) {
builder.Reset()
_, _ = tt.fmter.Process(nil, builder)
sort.Sort(tt.want)
require.Equal(t, tt.want, builder.Labels())
require.Equal(t, tt.want, builder.LabelsResult().Labels())
})
}
}
Expand Down
16 changes: 7 additions & 9 deletions pkg/logql/log/label_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
)

func TestBinary_Filter(t *testing.T) {

tests := []struct {
f LabelFilterer
lbs labels.Labels
Expand Down Expand Up @@ -158,7 +157,7 @@ func TestBinary_Filter(t *testing.T) {
_, got := tt.f.Process(nil, b)
require.Equal(t, tt.want, got)
sort.Sort(tt.wantLbs)
require.Equal(t, tt.wantLbs, b.Labels())
require.Equal(t, tt.wantLbs, b.LabelsResult().Labels())
})
}
}
Expand Down Expand Up @@ -192,7 +191,7 @@ func TestBytes_Filter(t *testing.T) {
_, got := f.Process(nil, b)
require.Equal(t, tt.want, got)
wantLbs := labels.Labels{{Name: "bar", Value: tt.wantLabel}}
require.Equal(t, wantLbs, b.Labels())
require.Equal(t, wantLbs, b.LabelsResult().Labels())
})
}
}
Expand Down Expand Up @@ -221,7 +220,6 @@ func TestErrorFiltering(t *testing.T) {
},
},
{

NewStringLabelFilter(labels.MustNewMatcher(labels.MatchNotRegexp, logqlmodel.ErrorLabel, ".+")),
labels.Labels{
{Name: "status", Value: "200"},
Expand All @@ -236,7 +234,6 @@ func TestErrorFiltering(t *testing.T) {
},
},
{

NewStringLabelFilter(labels.MustNewMatcher(labels.MatchNotRegexp, logqlmodel.ErrorLabel, ".+")),
labels.Labels{
{Name: "status", Value: "200"},
Expand All @@ -250,7 +247,6 @@ func TestErrorFiltering(t *testing.T) {
},
},
{

NewStringLabelFilter(labels.MustNewMatcher(labels.MatchNotEqual, logqlmodel.ErrorLabel, errJSON)),
labels.Labels{
{Name: "status", Value: "200"},
Expand All @@ -273,7 +269,7 @@ func TestErrorFiltering(t *testing.T) {
_, got := tt.f.Process(nil, b)
require.Equal(t, tt.want, got)
sort.Sort(tt.wantLbs)
require.Equal(t, tt.wantLbs, b.Labels())
require.Equal(t, tt.wantLbs, b.LabelsResult().Labels())
})
}
}
Expand All @@ -286,14 +282,16 @@ func TestReduceAndLabelFilter(t *testing.T) {
}{
{"empty", nil, NoopLabelFilter},
{"1", []LabelFilterer{NewBytesLabelFilter(LabelFilterEqual, "foo", 5)}, NewBytesLabelFilter(LabelFilterEqual, "foo", 5)},
{"2",
{
"2",
[]LabelFilterer{
NewBytesLabelFilter(LabelFilterEqual, "foo", 5),
NewBytesLabelFilter(LabelFilterGreaterThanOrEqual, "bar", 6),
},
NewAndLabelFilter(NewBytesLabelFilter(LabelFilterEqual, "foo", 5), NewBytesLabelFilter(LabelFilterGreaterThanOrEqual, "bar", 6)),
},
{"3",
{
"3",
[]LabelFilterer{
NewBytesLabelFilter(LabelFilterEqual, "foo", 5),
NewBytesLabelFilter(LabelFilterGreaterThanOrEqual, "bar", 6),
Expand Down
94 changes: 67 additions & 27 deletions pkg/logql/log/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ type BaseLabelsBuilder struct {
// LabelsBuilder is the same as labels.Builder but tailored for this package.
type LabelsBuilder struct {
base labels.Labels
baseMap map[string]string
buf labels.Labels
currentResult LabelsResult
groupedResult LabelsResult

Expand Down Expand Up @@ -210,19 +212,33 @@ func (b *LabelsBuilder) Set(n, v string) *LabelsBuilder {

// Labels returns the labels from the builder. If no modifications
// were made, the original labels are returned.
func (b *LabelsBuilder) Labels() labels.Labels {
func (b *LabelsBuilder) labels() labels.Labels {
b.buf = b.unsortedLabels(b.buf)
sort.Sort(b.buf)
return b.buf
}

func (b *LabelsBuilder) unsortedLabels(buf labels.Labels) labels.Labels {
if len(b.del) == 0 && len(b.add) == 0 {
if b.err == "" {
return b.base
if buf == nil {
buf = make(labels.Labels, 0, len(b.base)+1)
} else {
buf = buf[:0]
}
res := append(b.base.Copy(), labels.Label{Name: logqlmodel.ErrorLabel, Value: b.err})
sort.Sort(res)
return res
buf = append(buf, b.base...)
if b.err != "" {
buf = append(buf, labels.Label{Name: logqlmodel.ErrorLabel, Value: b.err})
}
return buf
}

// In the general case, labels are removed, modified or moved
// rather than added.
res := make(labels.Labels, 0, len(b.base))
if buf == nil {
buf = make(labels.Labels, 0, len(b.base)+len(b.add)+1)
} else {
buf = buf[:0]
}
Outer:
for _, l := range b.base {
for _, n := range b.del {
Expand All @@ -235,14 +251,30 @@ Outer:
continue Outer
}
}
res = append(res, l)
buf = append(buf, l)
}
res = append(res, b.add...)
buf = append(buf, b.add...)
if b.err != "" {
res = append(res, labels.Label{Name: logqlmodel.ErrorLabel, Value: b.err})
buf = append(buf, labels.Label{Name: logqlmodel.ErrorLabel, Value: b.err})
}
sort.Sort(res)

return buf
}

func (b *LabelsBuilder) Map() map[string]string {
if len(b.del) == 0 && len(b.add) == 0 && b.err == "" {
if b.baseMap == nil {
b.baseMap = b.base.Map()
}
return b.baseMap
}
b.buf = b.unsortedLabels(b.buf)
// todo should we also cache maps since limited by the result ?
// Maps also don't create a copy of the labels.
res := make(map[string]string, len(b.buf))
for _, l := range b.buf {
res[l.Name] = l.Value
}
return res
}

Expand All @@ -253,15 +285,15 @@ func (b *LabelsBuilder) LabelsResult() LabelsResult {
if len(b.del) == 0 && len(b.add) == 0 && b.err == "" {
return b.currentResult
}
return b.toResult(b.Labels())
return b.toResult(b.labels())
}

func (b *BaseLabelsBuilder) toResult(lbs labels.Labels) LabelsResult {
hash := b.hasher.Hash(lbs)
func (b *BaseLabelsBuilder) toResult(buf labels.Labels) LabelsResult {
hash := b.hasher.Hash(buf)
if cached, ok := b.resultCache[hash]; ok {
return cached
}
res := NewLabelsResult(lbs, hash)
res := NewLabelsResult(buf.Copy(), hash)
b.resultCache[hash] = res
return res
}
Expand Down Expand Up @@ -295,7 +327,11 @@ func (b *LabelsBuilder) GroupedLabels() LabelsResult {
}

func (b *LabelsBuilder) withResult() LabelsResult {
res := make(labels.Labels, 0, len(b.groups))
if b.buf == nil {
b.buf = make(labels.Labels, 0, len(b.groups))
} else {
b.buf = b.buf[:0]
}
Outer:
for _, g := range b.groups {
for _, n := range b.del {
Expand All @@ -305,26 +341,30 @@ Outer:
}
for _, la := range b.add {
if g == la.Name {
res = append(res, la)
b.buf = append(b.buf, la)
continue Outer
}
}
for _, l := range b.base {
if g == l.Name {
res = append(res, l)
b.buf = append(b.buf, l)
continue Outer
}
}
}
return b.toResult(res)
return b.toResult(b.buf)
}

func (b *LabelsBuilder) withoutResult() LabelsResult {
size := len(b.base) + len(b.add) - len(b.del) - len(b.groups)
if size < 0 {
size = 0
if b.buf == nil {
size := len(b.base) + len(b.add) - len(b.del) - len(b.groups)
if size < 0 {
size = 0
}
b.buf = make(labels.Labels, 0, size)
} else {
b.buf = b.buf[:0]
}
res := make(labels.Labels, 0, size)
Outer:
for _, l := range b.base {
for _, n := range b.del {
Expand All @@ -342,7 +382,7 @@ Outer:
continue Outer
}
}
res = append(res, l)
b.buf = append(b.buf, l)
}
OuterAdd:
for _, la := range b.add {
Expand All @@ -351,10 +391,10 @@ OuterAdd:
continue OuterAdd
}
}
res = append(res, la)
b.buf = append(b.buf, la)
}
sort.Sort(res)
return b.toResult(res)
sort.Sort(b.buf)
return b.toResult(b.buf)
}

func (b *LabelsBuilder) toBaseGroup() LabelsResult {
Expand Down
3 changes: 1 addition & 2 deletions pkg/logql/log/labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestLabelsBuilder_LabelsError(t *testing.T) {
b := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash())
b.Reset()
b.SetErr("err")
lbsWithErr := b.Labels()
lbsWithErr := b.LabelsResult().Labels()
require.Equal(
t,
labels.Labels{
Expand Down Expand Up @@ -143,7 +143,6 @@ func TestLabelsBuilder_GroupedLabelsResult(t *testing.T) {
}
sort.Sort(expected)
assertLabelResult(t, expected, b.GroupedLabels())

}

func assertLabelResult(t *testing.T, lbs labels.Labels, res LabelsResult) {
Expand Down
12 changes: 6 additions & 6 deletions pkg/logql/log/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func Test_jsonParser_Parse(t *testing.T) {
b.Reset()
_, _ = j.Process(tt.line, b)
sort.Sort(tt.want)
require.Equal(t, tt.want, b.Labels())
require.Equal(t, tt.want, b.LabelsResult().Labels())
})
}
}
Expand Down Expand Up @@ -355,7 +355,7 @@ func TestJSONExpressionParser(t *testing.T) {
b.Reset()
_, _ = j.Process(tt.line, b)
sort.Sort(tt.want)
require.Equal(t, tt.want, b.Labels())
require.Equal(t, tt.want, b.LabelsResult().Labels())
})
}
}
Expand Down Expand Up @@ -550,7 +550,7 @@ func Test_regexpParser_Parse(t *testing.T) {
b.Reset()
_, _ = tt.parser.Process(tt.line, b)
sort.Sort(tt.want)
require.Equal(t, tt.want, b.Labels())
require.Equal(t, tt.want, b.LabelsResult().Labels())
})
}
}
Expand Down Expand Up @@ -715,7 +715,7 @@ func Test_logfmtParser_Parse(t *testing.T) {
b.Reset()
_, _ = p.Process(tt.line, b)
sort.Sort(tt.want)
require.Equal(t, tt.want, b.Labels())
require.Equal(t, tt.want, b.LabelsResult().Labels())
})
}
}
Expand Down Expand Up @@ -809,7 +809,7 @@ func Test_unpackParser_Parse(t *testing.T) {
copy := string(tt.line)
l, _ := j.Process(tt.line, b)
sort.Sort(tt.wantLbs)
require.Equal(t, tt.wantLbs, b.Labels())
require.Equal(t, tt.wantLbs, b.LabelsResult().Labels())
require.Equal(t, tt.wantLine, l)
require.Equal(t, string(tt.wantLine), string(l))
require.Equal(t, copy, string(tt.line), "the original log line should not be mutated")
Expand Down Expand Up @@ -877,7 +877,7 @@ func Test_PatternParser(t *testing.T) {
require.NoError(t, err)
_, _ = pp.Process(tt.line, b)
sort.Sort(tt.want)
require.Equal(t, tt.want, b.Labels())
require.Equal(t, tt.want, b.LabelsResult().Labels())
})
}
}

0 comments on commit 67e7285

Please sign in to comment.