Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve JSON parser and add labels parser hints. #3080

Merged
merged 3 commits into from
Jan 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 25 additions & 15 deletions pkg/logql/log/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ func (n notFilter) Filter(line []byte) bool {
}

func (n notFilter) ToStage() Stage {
return StageFunc(func(line []byte, _ *LabelsBuilder) ([]byte, bool) {
return line, n.Filter(line)
})
return StageFunc{
process: func(line []byte, _ *LabelsBuilder) ([]byte, bool) {
return line, n.Filter(line)
},
}
}

// newNotFilter creates a new filter which matches only if the base filter doesn't match.
Expand Down Expand Up @@ -81,9 +83,11 @@ func (a andFilter) Filter(line []byte) bool {
}

func (a andFilter) ToStage() Stage {
return StageFunc(func(line []byte, _ *LabelsBuilder) ([]byte, bool) {
return line, a.Filter(line)
})
return StageFunc{
process: func(line []byte, _ *LabelsBuilder) ([]byte, bool) {
return line, a.Filter(line)
},
}
}

type orFilter struct {
Expand Down Expand Up @@ -120,9 +124,11 @@ func (a orFilter) Filter(line []byte) bool {
}

func (a orFilter) ToStage() Stage {
return StageFunc(func(line []byte, _ *LabelsBuilder) ([]byte, bool) {
return line, a.Filter(line)
})
return StageFunc{
process: func(line []byte, _ *LabelsBuilder) ([]byte, bool) {
return line, a.Filter(line)
},
}
}

type regexpFilter struct {
Expand All @@ -148,9 +154,11 @@ func (r regexpFilter) Filter(line []byte) bool {
}

func (r regexpFilter) ToStage() Stage {
return StageFunc(func(line []byte, _ *LabelsBuilder) ([]byte, bool) {
return line, r.Filter(line)
})
return StageFunc{
process: func(line []byte, _ *LabelsBuilder) ([]byte, bool) {
return line, r.Filter(line)
},
}
}

type containsFilter struct {
Expand All @@ -166,9 +174,11 @@ func (l containsFilter) Filter(line []byte) bool {
}

func (l containsFilter) ToStage() Stage {
return StageFunc(func(line []byte, _ *LabelsBuilder) ([]byte, bool) {
return line, l.Filter(line)
})
return StageFunc{
process: func(line []byte, _ *LabelsBuilder) ([]byte, bool) {
return line, l.Filter(line)
},
}
}

func (l containsFilter) String() string {
Expand Down
61 changes: 61 additions & 0 deletions pkg/logql/log/fmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"regexp"
"strings"
"text/template"
"text/template/parse"
)

var (
Expand Down Expand Up @@ -86,6 +87,61 @@ func (lf *LineFormatter) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool)
return res, true
}

func (lf *LineFormatter) RequiredLabelNames() []string {
return uniqueString(listNodeFields(lf.Root))
}

func listNodeFields(node parse.Node) []string {
var res []string
if node.Type() == parse.NodeAction {
res = append(res, listNodeFieldsFromPipe(node.(*parse.ActionNode).Pipe)...)
}
res = append(res, listNodeFieldsFromBranch(node)...)
if ln, ok := node.(*parse.ListNode); ok {
for _, n := range ln.Nodes {
res = append(res, listNodeFields(n)...)
}
}
return res
}

func listNodeFieldsFromBranch(node parse.Node) []string {
var res []string
var b parse.BranchNode
switch node.Type() {
case parse.NodeIf:
b = node.(*parse.IfNode).BranchNode
case parse.NodeWith:
b = node.(*parse.WithNode).BranchNode
case parse.NodeRange:
b = node.(*parse.RangeNode).BranchNode
default:
return res
}
if b.Pipe != nil {
res = append(res, listNodeFieldsFromPipe(b.Pipe)...)
}
if b.List != nil {
res = append(res, listNodeFields(b.List)...)
}
if b.ElseList != nil {
res = append(res, listNodeFields(b.ElseList)...)
}
return res
}

func listNodeFieldsFromPipe(p *parse.PipeNode) []string {
var res []string
for _, c := range p.Cmds {
for _, a := range c.Args {
if f, ok := a.(*parse.FieldNode); ok {
res = append(res, f.Ident...)
}
}
}
return res
}

// LabelFmt is a configuration struct for formatting a label.
type LabelFmt struct {
Name string
Expand Down Expand Up @@ -187,6 +243,11 @@ func (lf *LabelsFormatter) Process(l []byte, lbs *LabelsBuilder) ([]byte, bool)
return l, true
}

func (lf *LabelsFormatter) RequiredLabelNames() []string {
var names []string
return names
}

func trunc(c int, s string) string {
runes := []rune(s)
l := len(runes)
Expand Down
19 changes: 19 additions & 0 deletions pkg/logql/log/fmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,22 @@ func Test_substring(t *testing.T) {
})
}
}

func TestLineFormatter_RequiredLabelNames(t *testing.T) {
tests := []struct {
fmt string
want []string
}{
{`{{.foo}} and {{.bar}}`, []string{"foo", "bar"}},
{`{{ .foo | ToUpper | .buzz }} and {{.bar}}`, []string{"foo", "buzz", "bar"}},
{`{{ regexReplaceAllLiteral "(p)" .foo "${1}" }}`, []string{"foo"}},
{`{{ if .foo | hasSuffix "Ip" }} {{.bar}} {{end}}-{{ if .foo | hasSuffix "pw"}}no{{end}}`, []string{"foo", "bar"}},
{`{{with .foo}}{{printf "%q" .}} {{end}}`, []string{"foo"}},
{`{{with .foo}}{{printf "%q" .}} {{else}} {{ .buzz | lower }} {{end}}`, []string{"foo", "buzz"}},
}
for _, tt := range tests {
t.Run(tt.fmt, func(t *testing.T) {
require.Equal(t, tt.want, newMustLineFormatter(tt.fmt).RequiredLabelNames())
})
}
}
24 changes: 24 additions & 0 deletions pkg/logql/log/label_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ func (b *BinaryLabelFilter) Process(line []byte, lbs *LabelsBuilder) ([]byte, bo
return line, lok && rok
}

func (b *BinaryLabelFilter) RequiredLabelNames() []string {
var names []string
names = append(names, b.Left.RequiredLabelNames()...)
names = append(names, b.Right.RequiredLabelNames()...)
return uniqueString(names)
}

func (b *BinaryLabelFilter) String() string {
var sb strings.Builder
sb.WriteString("( ")
Expand All @@ -113,6 +120,7 @@ type noopLabelFilter struct{}

func (noopLabelFilter) String() string { return "" }
func (noopLabelFilter) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) { return line, true }
func (noopLabelFilter) RequiredLabelNames() []string { return []string{} }

// ReduceAndLabelFilter Reduces multiple label filterer into one using binary and operation.
func ReduceAndLabelFilter(filters []LabelFilterer) LabelFilterer {
Expand Down Expand Up @@ -179,6 +187,10 @@ func (d *BytesLabelFilter) Process(line []byte, lbs *LabelsBuilder) ([]byte, boo
}
}

func (d *BytesLabelFilter) RequiredLabelNames() []string {
return []string{d.Name}
}

func (d *BytesLabelFilter) String() string {
b := strings.Map(func(r rune) rune {
if unicode.IsSpace(r) {
Expand Down Expand Up @@ -239,6 +251,10 @@ func (d *DurationLabelFilter) Process(line []byte, lbs *LabelsBuilder) ([]byte,
}
}

func (d *DurationLabelFilter) RequiredLabelNames() []string {
return []string{d.Name}
}

func (d *DurationLabelFilter) String() string {
return fmt.Sprintf("%s%s%s", d.Name, d.Type, d.Value)
}
Expand Down Expand Up @@ -294,6 +310,10 @@ func (n *NumericLabelFilter) Process(line []byte, lbs *LabelsBuilder) ([]byte, b

}

func (n *NumericLabelFilter) RequiredLabelNames() []string {
return []string{n.Name}
}

func (n *NumericLabelFilter) String() string {
return fmt.Sprintf("%s%s%s", n.Name, n.Type, strconv.FormatFloat(n.Value, 'f', -1, 64))
}
Expand All @@ -318,3 +338,7 @@ func (s *StringLabelFilter) Process(line []byte, lbs *LabelsBuilder) ([]byte, bo
v, _ := lbs.Get(s.Name)
return line, s.Matches(v)
}

func (s *StringLabelFilter) RequiredLabelNames() []string {
return []string{s.Name}
}
26 changes: 17 additions & 9 deletions pkg/logql/log/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type BaseLabelsBuilder struct {
err string

groups []string
parserKeyHints []string // label key hints for metric queries that allows to limit parser extractions to only this list of labels.
without, noLabels bool

resultCache map[uint64]LabelsResult
Expand All @@ -84,21 +85,22 @@ type LabelsBuilder struct {
}

// NewBaseLabelsBuilderWithGrouping creates a new base labels builder with grouping to compute results.
func NewBaseLabelsBuilderWithGrouping(groups []string, without, noLabels bool) *BaseLabelsBuilder {
func NewBaseLabelsBuilderWithGrouping(groups []string, parserKeyHints []string, without, noLabels bool) *BaseLabelsBuilder {
return &BaseLabelsBuilder{
del: make([]string, 0, 5),
add: make([]labels.Label, 0, 16),
resultCache: make(map[uint64]LabelsResult),
hasher: newHasher(),
groups: groups,
noLabels: noLabels,
without: without,
del: make([]string, 0, 5),
add: make([]labels.Label, 0, 16),
resultCache: make(map[uint64]LabelsResult),
hasher: newHasher(),
groups: groups,
parserKeyHints: parserKeyHints,
noLabels: noLabels,
without: without,
}
}

// NewLabelsBuilder creates a new base labels builder.
func NewBaseLabelsBuilder() *BaseLabelsBuilder {
return NewBaseLabelsBuilderWithGrouping(nil, false, false)
return NewBaseLabelsBuilderWithGrouping(nil, nil, false, false)
}

// ForLabels creates a labels builder for a given labels set as base.
Expand Down Expand Up @@ -129,6 +131,12 @@ func (b *LabelsBuilder) Reset() {
b.err = ""
}

// ParserLabelHints returns a limited list of expected labels to extract for metric queries.
// Returns nil when it's impossible to hint labels extractions.
func (b *BaseLabelsBuilder) ParserLabelHints() []string {
return b.parserKeyHints
}

// SetErr sets the error label.
func (b *LabelsBuilder) SetErr(err string) *LabelsBuilder {
b.err = err
Expand Down
8 changes: 4 additions & 4 deletions pkg/logql/log/labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestLabelsBuilder_GroupedLabelsResult(t *testing.T) {
labels.Label{Name: "cluster", Value: "us-central1"},
}
sort.Sort(lbs)
b := NewBaseLabelsBuilderWithGrouping([]string{"namespace"}, false, false).ForLabels(lbs, lbs.Hash())
b := NewBaseLabelsBuilderWithGrouping([]string{"namespace"}, nil, false, false).ForLabels(lbs, lbs.Hash())
b.Reset()
assertLabelResult(t, labels.Labels{labels.Label{Name: "namespace", Value: "loki"}}, b.GroupedLabels())
b.SetErr("err")
Expand All @@ -109,7 +109,7 @@ func TestLabelsBuilder_GroupedLabelsResult(t *testing.T) {
// cached.
assertLabelResult(t, expected, b.GroupedLabels())

b = NewBaseLabelsBuilderWithGrouping([]string{"job"}, false, false).ForLabels(lbs, lbs.Hash())
b = NewBaseLabelsBuilderWithGrouping([]string{"job"}, nil, false, false).ForLabels(lbs, lbs.Hash())
assertLabelResult(t, labels.Labels{labels.Label{Name: "job", Value: "us-central1/loki"}}, b.GroupedLabels())
assertLabelResult(t, labels.Labels{labels.Label{Name: "job", Value: "us-central1/loki"}}, b.GroupedLabels())
b.Del("job")
Expand All @@ -118,7 +118,7 @@ func TestLabelsBuilder_GroupedLabelsResult(t *testing.T) {
b.Set("namespace", "tempo")
assertLabelResult(t, labels.Labels{labels.Label{Name: "job", Value: "us-central1/loki"}}, b.GroupedLabels())

b = NewBaseLabelsBuilderWithGrouping([]string{"job"}, true, false).ForLabels(lbs, lbs.Hash())
b = NewBaseLabelsBuilderWithGrouping([]string{"job"}, nil, true, false).ForLabels(lbs, lbs.Hash())
b.Del("job")
b.Set("foo", "bar")
b.Set("job", "something")
Expand All @@ -130,7 +130,7 @@ func TestLabelsBuilder_GroupedLabelsResult(t *testing.T) {
sort.Sort(expected)
assertLabelResult(t, expected, b.GroupedLabels())

b = NewBaseLabelsBuilderWithGrouping(nil, false, false).ForLabels(lbs, lbs.Hash())
b = NewBaseLabelsBuilderWithGrouping(nil, nil, false, false).ForLabels(lbs, lbs.Hash())
b.Set("foo", "bar")
b.Set("job", "something")
expected = labels.Labels{
Expand Down
22 changes: 18 additions & 4 deletions pkg/logql/log/metrics_extraction.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,16 @@ type lineSampleExtractor struct {
// NewLineSampleExtractor creates a SampleExtractor from a LineExtractor.
// Multiple log stages are run before converting the log line.
func NewLineSampleExtractor(ex LineExtractor, stages []Stage, groups []string, without bool, noLabels bool) (SampleExtractor, error) {
s := ReduceStages(stages)
var expectedLabels []string
if !without {
expectedLabels = append(expectedLabels, s.RequiredLabelNames()...)
expectedLabels = uniqueString(append(expectedLabels, groups...))
}
return &lineSampleExtractor{
Stage: ReduceStages(stages),
Stage: s,
LineExtractor: ex,
baseBuilder: NewBaseLabelsBuilderWithGrouping(groups, without, noLabels),
baseBuilder: NewBaseLabelsBuilderWithGrouping(groups, expectedLabels, without, noLabels),
streamExtractors: make(map[uint64]StreamSampleExtractor),
}, nil
}
Expand Down Expand Up @@ -132,12 +138,20 @@ func LabelExtractorWithStages(
groups = append(groups, labelName)
sort.Strings(groups)
}
preStage := ReduceStages(preStages)
var expectedLabels []string
if !without {
expectedLabels = append(expectedLabels, preStage.RequiredLabelNames()...)
expectedLabels = append(expectedLabels, groups...)
expectedLabels = append(expectedLabels, postFilter.RequiredLabelNames()...)
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
expectedLabels = uniqueString(expectedLabels)
}
return &labelSampleExtractor{
preStage: ReduceStages(preStages),
preStage: preStage,
conversionFn: convFn,
labelName: labelName,
postFilter: postFilter,
baseBuilder: NewBaseLabelsBuilderWithGrouping(groups, without, noLabels),
baseBuilder: NewBaseLabelsBuilderWithGrouping(groups, expectedLabels, without, noLabels),
streamExtractors: make(map[uint64]StreamSampleExtractor),
}, nil
}
Expand Down
Loading