From 0bf372530274bc06d61a284026a386260336b3d7 Mon Sep 17 00:00:00 2001
From: Cyril Tovena <cyril.tovena@gmail.com>
Date: Tue, 15 Dec 2020 10:24:41 +0100
Subject: [PATCH 1/3] Improve JSON parser and add labels parser hints.
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

This is a rework of the json parser, that now uses iteration over reflection allowing to avoid allocations.

I've also implemented a way to guess and hints  what labels needs to be parsed when doing metric queries, this means we can extract only the few labels required from the log line.

benchcmp

```
❯ benchcmp  before.txt after.txt5
benchmark                                      old ns/op     new ns/op     delta
Benchmark_Parser/json/no_labels_hints-16       9889          3281          -66.82%
Benchmark_Parser/logfmt/no_labels_hints-16     1624          1671          +2.89%
Benchmark_Parser/logfmt/labels_hints-16        1601          790           -50.66%
benchmark                                      old allocs     new allocs     delta
Benchmark_Parser/json/no_labels_hints-16       139            56             -59.71%
Benchmark_Parser/logfmt/no_labels_hints-16     31             31             +0.00%
Benchmark_Parser/logfmt/labels_hints-16        31             5              -83.87%
benchmark                                      old bytes     new bytes     delta
Benchmark_Parser/json/no_labels_hints-16       3671          912           -75.16%
Benchmark_Parser/logfmt/no_labels_hints-16     464           464           +0.00%
Benchmark_Parser/logfmt/labels_hints-16        464           144           -68.97%
```

I've experienced a 2x to 4x improvement in my cluster.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
---
 pkg/logql/log/filter.go             |  40 +++---
 pkg/logql/log/fmt.go                |  61 +++++++++
 pkg/logql/log/fmt_test.go           |  19 +++
 pkg/logql/log/label_filter.go       |  24 ++++
 pkg/logql/log/labels.go             |  26 ++--
 pkg/logql/log/labels_test.go        |   8 +-
 pkg/logql/log/metrics_extraction.go |  17 ++-
 pkg/logql/log/parser.go             | 200 +++++++++++++++++++++-------
 pkg/logql/log/parser_test.go        |  75 ++++++++---
 pkg/logql/log/pipeline.go           |  41 ++++--
 pkg/logql/log/util.go               |  35 +++++
 pkg/logql/log/util_test.go          |  24 ++++
 12 files changed, 461 insertions(+), 109 deletions(-)
 create mode 100644 pkg/logql/log/util.go
 create mode 100644 pkg/logql/log/util_test.go

diff --git a/pkg/logql/log/filter.go b/pkg/logql/log/filter.go
index 53851d009354b..8378247df2f17 100644
--- a/pkg/logql/log/filter.go
+++ b/pkg/logql/log/filter.go
@@ -39,9 +39,11 @@ func (n notFilter) Filter(line []byte) bool {
 }
 
 func (n notFilter) ToStage() Stage {
-	return StageFunc(func(line []byte, _ *LabelsBuilder) ([]byte, bool) {
-		return line, n.Filter(line)
-	})
+	return StageFunc{
+		process: func(line []byte, _ *LabelsBuilder) ([]byte, bool) {
+			return line, n.Filter(line)
+		},
+	}
 }
 
 // newNotFilter creates a new filter which matches only if the base filter doesn't match.
@@ -81,9 +83,11 @@ func (a andFilter) Filter(line []byte) bool {
 }
 
 func (a andFilter) ToStage() Stage {
-	return StageFunc(func(line []byte, _ *LabelsBuilder) ([]byte, bool) {
-		return line, a.Filter(line)
-	})
+	return StageFunc{
+		process: func(line []byte, _ *LabelsBuilder) ([]byte, bool) {
+			return line, a.Filter(line)
+		},
+	}
 }
 
 type orFilter struct {
@@ -120,9 +124,11 @@ func (a orFilter) Filter(line []byte) bool {
 }
 
 func (a orFilter) ToStage() Stage {
-	return StageFunc(func(line []byte, _ *LabelsBuilder) ([]byte, bool) {
-		return line, a.Filter(line)
-	})
+	return StageFunc{
+		process: func(line []byte, _ *LabelsBuilder) ([]byte, bool) {
+			return line, a.Filter(line)
+		},
+	}
 }
 
 type regexpFilter struct {
@@ -148,9 +154,11 @@ func (r regexpFilter) Filter(line []byte) bool {
 }
 
 func (r regexpFilter) ToStage() Stage {
-	return StageFunc(func(line []byte, _ *LabelsBuilder) ([]byte, bool) {
-		return line, r.Filter(line)
-	})
+	return StageFunc{
+		process: func(line []byte, _ *LabelsBuilder) ([]byte, bool) {
+			return line, r.Filter(line)
+		},
+	}
 }
 
 type containsFilter struct {
@@ -166,9 +174,11 @@ func (l containsFilter) Filter(line []byte) bool {
 }
 
 func (l containsFilter) ToStage() Stage {
-	return StageFunc(func(line []byte, _ *LabelsBuilder) ([]byte, bool) {
-		return line, l.Filter(line)
-	})
+	return StageFunc{
+		process: func(line []byte, _ *LabelsBuilder) ([]byte, bool) {
+			return line, l.Filter(line)
+		},
+	}
 }
 
 func (l containsFilter) String() string {
diff --git a/pkg/logql/log/fmt.go b/pkg/logql/log/fmt.go
index 745aed711974c..e30d92a823346 100644
--- a/pkg/logql/log/fmt.go
+++ b/pkg/logql/log/fmt.go
@@ -6,6 +6,7 @@ import (
 	"regexp"
 	"strings"
 	"text/template"
+	"text/template/parse"
 )
 
 var (
@@ -86,6 +87,61 @@ func (lf *LineFormatter) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool)
 	return res, true
 }
 
+func (lf *LineFormatter) RequiredLabelNames() []string {
+	return uniqueString(listNodeFields(lf.Root))
+}
+
+func listNodeFields(node parse.Node) []string {
+	var res []string
+	if node.Type() == parse.NodeAction {
+		res = append(res, listNodeFieldsFromPipe(node.(*parse.ActionNode).Pipe)...)
+	}
+	res = append(res, listNodeFieldsFromBranch(node)...)
+	if ln, ok := node.(*parse.ListNode); ok {
+		for _, n := range ln.Nodes {
+			res = append(res, listNodeFields(n)...)
+		}
+	}
+	return res
+}
+
+func listNodeFieldsFromBranch(node parse.Node) []string {
+	var res []string
+	var b parse.BranchNode
+	switch node.Type() {
+	case parse.NodeIf:
+		b = node.(*parse.IfNode).BranchNode
+	case parse.NodeWith:
+		b = node.(*parse.WithNode).BranchNode
+	case parse.NodeRange:
+		b = node.(*parse.RangeNode).BranchNode
+	default:
+		return res
+	}
+	if b.Pipe != nil {
+		res = append(res, listNodeFieldsFromPipe(b.Pipe)...)
+	}
+	if b.List != nil {
+		res = append(res, listNodeFields(b.List)...)
+	}
+	if b.ElseList != nil {
+		res = append(res, listNodeFields(b.ElseList)...)
+	}
+	return res
+}
+
+func listNodeFieldsFromPipe(p *parse.PipeNode) []string {
+	var res []string
+	for _, c := range p.Cmds {
+		for _, a := range c.Args {
+			if f, ok := a.(*parse.FieldNode); ok {
+				res = append(res, f.Ident...)
+			}
+		}
+	}
+	return res
+}
+
 // LabelFmt is a configuration struct for formatting a label.
 type LabelFmt struct {
 	Name  string
@@ -187,6 +243,11 @@ func (lf *LabelsFormatter) Process(l []byte, lbs *LabelsBuilder) ([]byte, bool)
 	return l, true
 }
 
+func (lf *LabelsFormatter) RequiredLabelNames() []string {
+	var names []string
+	return names
+}
+
 func trunc(c int, s string) string {
 	runes := []rune(s)
 	l := len(runes)
diff --git a/pkg/logql/log/fmt_test.go b/pkg/logql/log/fmt_test.go
index 784c1434983ff..0e9a3e3a8483c 100644
--- a/pkg/logql/log/fmt_test.go
+++ b/pkg/logql/log/fmt_test.go
@@ -290,3 +290,22 @@ func Test_substring(t *testing.T) {
 		})
 	}
 }
+
+func TestLineFormatter_RequiredLabelNames(t *testing.T) {
+	tests := []struct {
+		fmt  string
+		want []string
+	}{
+		{`{{.foo}} and {{.bar}}`, []string{"foo", "bar"}},
+		{`{{ .foo | ToUpper | .buzz }} and {{.bar}}`, []string{"foo", "buzz", "bar"}},
+		{`{{ regexReplaceAllLiteral "(p)" .foo "${1}" }}`, []string{"foo"}},
+		{`{{ if  .foo | hasSuffix "Ip" }} {{.bar}} {{end}}-{{ if  .foo | hasSuffix "pw"}}no{{end}}`, []string{"foo", "bar"}},
+		{`{{with .foo}}{{printf "%q" .}} {{end}}`, []string{"foo"}},
+		{`{{with .foo}}{{printf "%q" .}} {{else}} {{ .buzz | lower }} {{end}}`, []string{"foo", "buzz"}},
+	}
+	for _, tt := range tests {
+		t.Run(tt.fmt, func(t *testing.T) {
+			require.Equal(t, tt.want, newMustLineFormatter(tt.fmt).RequiredLabelNames())
+		})
+	}
+}
diff --git a/pkg/logql/log/label_filter.go b/pkg/logql/log/label_filter.go
index 5edca0f4f49e8..d433e48a9753f 100644
--- a/pkg/logql/log/label_filter.go
+++ b/pkg/logql/log/label_filter.go
@@ -95,6 +95,13 @@ func (b *BinaryLabelFilter) Process(line []byte, lbs *LabelsBuilder) ([]byte, bo
 	return line, lok && rok
 }
 
+func (b *BinaryLabelFilter) RequiredLabelNames() []string {
+	var names []string
+	names = append(names, b.Left.RequiredLabelNames()...)
+	names = append(names, b.Right.RequiredLabelNames()...)
+	return uniqueString(names)
+}
+
 func (b *BinaryLabelFilter) String() string {
 	var sb strings.Builder
 	sb.WriteString("( ")
@@ -113,6 +120,7 @@ type noopLabelFilter struct{}
 
 func (noopLabelFilter) String() string                                         { return "" }
 func (noopLabelFilter) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) { return line, true }
+func (noopLabelFilter) RequiredLabelNames() []string                           { return []string{} }
 
 // ReduceAndLabelFilter Reduces multiple label filterer into one using binary and operation.
 func ReduceAndLabelFilter(filters []LabelFilterer) LabelFilterer {
@@ -179,6 +187,10 @@ func (d *BytesLabelFilter) Process(line []byte, lbs *LabelsBuilder) ([]byte, boo
 	}
 }
 
+func (d *BytesLabelFilter) RequiredLabelNames() []string {
+	return []string{d.Name}
+}
+
 func (d *BytesLabelFilter) String() string {
 	b := strings.Map(func(r rune) rune {
 		if unicode.IsSpace(r) {
@@ -239,6 +251,10 @@ func (d *DurationLabelFilter) Process(line []byte, lbs *LabelsBuilder) ([]byte,
 	}
 }
 
+func (d *DurationLabelFilter) RequiredLabelNames() []string {
+	return []string{d.Name}
+}
+
 func (d *DurationLabelFilter) String() string {
 	return fmt.Sprintf("%s%s%s", d.Name, d.Type, d.Value)
 }
@@ -294,6 +310,10 @@ func (n *NumericLabelFilter) Process(line []byte, lbs *LabelsBuilder) ([]byte, b
 
 }
 
+func (n *NumericLabelFilter) RequiredLabelNames() []string {
+	return []string{n.Name}
+}
+
 func (n *NumericLabelFilter) String() string {
 	return fmt.Sprintf("%s%s%s", n.Name, n.Type, strconv.FormatFloat(n.Value, 'f', -1, 64))
 }
@@ -318,3 +338,7 @@ func (s *StringLabelFilter) Process(line []byte, lbs *LabelsBuilder) ([]byte, bo
 	v, _ := lbs.Get(s.Name)
 	return line, s.Matches(v)
 }
+
+func (s *StringLabelFilter) RequiredLabelNames() []string {
+	return []string{s.Name}
+}
diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go
index df63afc86e790..e32dd8f870d3b 100644
--- a/pkg/logql/log/labels.go
+++ b/pkg/logql/log/labels.go
@@ -68,6 +68,7 @@ type BaseLabelsBuilder struct {
 	err string
 
 	groups            []string
+	parserKeyHints    []string // label key hints for metric queries that allows to limit parser extractions to only this list of labels.
 	without, noLabels bool
 
 	resultCache map[uint64]LabelsResult
@@ -84,21 +85,22 @@ type LabelsBuilder struct {
 }
 
 // NewBaseLabelsBuilderWithGrouping creates a new base labels builder with grouping to compute results.
-func NewBaseLabelsBuilderWithGrouping(groups []string, without, noLabels bool) *BaseLabelsBuilder {
+func NewBaseLabelsBuilderWithGrouping(groups []string, parserKeyHints []string, without, noLabels bool) *BaseLabelsBuilder {
 	return &BaseLabelsBuilder{
-		del:         make([]string, 0, 5),
-		add:         make([]labels.Label, 0, 16),
-		resultCache: make(map[uint64]LabelsResult),
-		hasher:      newHasher(),
-		groups:      groups,
-		noLabels:    noLabels,
-		without:     without,
+		del:            make([]string, 0, 5),
+		add:            make([]labels.Label, 0, 16),
+		resultCache:    make(map[uint64]LabelsResult),
+		hasher:         newHasher(),
+		groups:         groups,
+		parserKeyHints: parserKeyHints,
+		noLabels:       noLabels,
+		without:        without,
 	}
 }
 
 // NewLabelsBuilder creates a new base labels builder.
 func NewBaseLabelsBuilder() *BaseLabelsBuilder {
-	return NewBaseLabelsBuilderWithGrouping(nil, false, false)
+	return NewBaseLabelsBuilderWithGrouping(nil, nil, false, false)
 }
 
 // ForLabels creates a labels builder for a given labels set as base.
@@ -129,6 +131,12 @@ func (b *LabelsBuilder) Reset() {
 	b.err = ""
 }
 
+// ExpectedLabels returns a limited list of expected labels to extract for metric queries.
+// Returns nil when it's impossible to hint labels extractions.
+func (b *BaseLabelsBuilder) ParserLabelHints() []string {
+	return b.parserKeyHints
+}
+
 // SetErr sets the error label.
 func (b *LabelsBuilder) SetErr(err string) *LabelsBuilder {
 	b.err = err
diff --git a/pkg/logql/log/labels_test.go b/pkg/logql/log/labels_test.go
index 493fb617bb517..69c446982ae27 100644
--- a/pkg/logql/log/labels_test.go
+++ b/pkg/logql/log/labels_test.go
@@ -88,7 +88,7 @@ func TestLabelsBuilder_GroupedLabelsResult(t *testing.T) {
 		labels.Label{Name: "cluster", Value: "us-central1"},
 	}
 	sort.Sort(lbs)
-	b := NewBaseLabelsBuilderWithGrouping([]string{"namespace"}, false, false).ForLabels(lbs, lbs.Hash())
+	b := NewBaseLabelsBuilderWithGrouping([]string{"namespace"}, nil, false, false).ForLabels(lbs, lbs.Hash())
 	b.Reset()
 	assertLabelResult(t, labels.Labels{labels.Label{Name: "namespace", Value: "loki"}}, b.GroupedLabels())
 	b.SetErr("err")
@@ -109,7 +109,7 @@ func TestLabelsBuilder_GroupedLabelsResult(t *testing.T) {
 	// cached.
 	assertLabelResult(t, expected, b.GroupedLabels())
 
-	b = NewBaseLabelsBuilderWithGrouping([]string{"job"}, false, false).ForLabels(lbs, lbs.Hash())
+	b = NewBaseLabelsBuilderWithGrouping([]string{"job"}, nil, false, false).ForLabels(lbs, lbs.Hash())
 	assertLabelResult(t, labels.Labels{labels.Label{Name: "job", Value: "us-central1/loki"}}, b.GroupedLabels())
 	assertLabelResult(t, labels.Labels{labels.Label{Name: "job", Value: "us-central1/loki"}}, b.GroupedLabels())
 	b.Del("job")
@@ -118,7 +118,7 @@ func TestLabelsBuilder_GroupedLabelsResult(t *testing.T) {
 	b.Set("namespace", "tempo")
 	assertLabelResult(t, labels.Labels{labels.Label{Name: "job", Value: "us-central1/loki"}}, b.GroupedLabels())
 
-	b = NewBaseLabelsBuilderWithGrouping([]string{"job"}, true, false).ForLabels(lbs, lbs.Hash())
+	b = NewBaseLabelsBuilderWithGrouping([]string{"job"}, nil, true, false).ForLabels(lbs, lbs.Hash())
 	b.Del("job")
 	b.Set("foo", "bar")
 	b.Set("job", "something")
@@ -130,7 +130,7 @@ func TestLabelsBuilder_GroupedLabelsResult(t *testing.T) {
 	sort.Sort(expected)
 	assertLabelResult(t, expected, b.GroupedLabels())
 
-	b = NewBaseLabelsBuilderWithGrouping(nil, false, false).ForLabels(lbs, lbs.Hash())
+	b = NewBaseLabelsBuilderWithGrouping(nil, nil, false, false).ForLabels(lbs, lbs.Hash())
 	b.Set("foo", "bar")
 	b.Set("job", "something")
 	expected = labels.Labels{
diff --git a/pkg/logql/log/metrics_extraction.go b/pkg/logql/log/metrics_extraction.go
index 58cacf503816d..861bd7b4df74e 100644
--- a/pkg/logql/log/metrics_extraction.go
+++ b/pkg/logql/log/metrics_extraction.go
@@ -48,10 +48,15 @@ type lineSampleExtractor struct {
 // NewLineSampleExtractor creates a SampleExtractor from a LineExtractor.
 // Multiple log stages are run before converting the log line.
 func NewLineSampleExtractor(ex LineExtractor, stages []Stage, groups []string, without bool, noLabels bool) (SampleExtractor, error) {
+	s := ReduceStages(stages)
+	var expectedLabels []string
+	if !without {
+		expectedLabels = uniqueString(append(s.RequiredLabelNames(), groups...))
+	}
 	return &lineSampleExtractor{
-		Stage:            ReduceStages(stages),
+		Stage:            s,
 		LineExtractor:    ex,
-		baseBuilder:      NewBaseLabelsBuilderWithGrouping(groups, without, noLabels),
+		baseBuilder:      NewBaseLabelsBuilderWithGrouping(groups, expectedLabels, without, noLabels),
 		streamExtractors: make(map[uint64]StreamSampleExtractor),
 	}, nil
 }
@@ -132,12 +137,18 @@ func LabelExtractorWithStages(
 		groups = append(groups, labelName)
 		sort.Strings(groups)
 	}
+	var expectedLabels []string
+	if !without {
+		expectedLabels = append(postFilter.RequiredLabelNames(), groups...)
+		expectedLabels = append(expectedLabels, postFilter.RequiredLabelNames()...)
+		expectedLabels = uniqueString(expectedLabels)
+	}
 	return &labelSampleExtractor{
 		preStage:         ReduceStages(preStages),
 		conversionFn:     convFn,
 		labelName:        labelName,
 		postFilter:       postFilter,
-		baseBuilder:      NewBaseLabelsBuilderWithGrouping(groups, without, noLabels),
+		baseBuilder:      NewBaseLabelsBuilderWithGrouping(groups, expectedLabels, without, noLabels),
 		streamExtractors: make(map[uint64]StreamSampleExtractor),
 	}, nil
 }
diff --git a/pkg/logql/log/parser.go b/pkg/logql/log/parser.go
index aa9ef3d28edab..caa83f518b7b3 100644
--- a/pkg/logql/log/parser.go
+++ b/pkg/logql/log/parser.go
@@ -3,8 +3,8 @@ package log
 import (
 	"errors"
 	"fmt"
+	"io"
 	"regexp"
-	"strconv"
 	"strings"
 
 	"github.com/grafana/loki/pkg/logql/log/logfmt"
@@ -14,8 +14,10 @@ import (
 )
 
 const (
-	jsonSpacer      = "_"
+	jsonSpacer      = '_'
 	duplicateSuffix = "_extracted"
+	trueString      = "true"
+	falseString     = "false"
 )
 
 var (
@@ -26,67 +28,169 @@ var (
 	errMissingCapture = errors.New("at least one named capture must be supplied")
 )
 
-func addLabel(lbs *LabelsBuilder, key, value string) {
-	key = sanitizeKey(key)
-	if lbs.BaseHas(key) {
-		key = fmt.Sprintf("%s%s", key, duplicateSuffix)
+type JSONParser struct {
+	buf []byte // buffer used to build json keys
+	lbs *LabelsBuilder
+}
+
+// NewJSONParser creates a log stage that can parse a json log line and add properties as labels.
+func NewJSONParser() *JSONParser {
+	return &JSONParser{
+		buf: make([]byte, 0, 1024),
 	}
-	lbs.Set(key, value)
 }
 
-func sanitizeKey(key string) string {
-	if len(key) == 0 {
-		return key
+func (j *JSONParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) {
+	it := jsoniter.ConfigFastest.BorrowIterator(line)
+	defer jsoniter.ConfigFastest.ReturnIterator(it)
+
+	// reset the state.
+	j.buf = j.buf[:0]
+	j.lbs = lbs
+
+	if err := j.readObject(it); err != nil {
+		lbs.SetErr(errJSON)
+		return line, true
+	}
+	return line, true
+}
+
+func (j *JSONParser) readObject(it *jsoniter.Iterator) error {
+	// we only care about object and values.
+	if nextType := it.WhatIsNext(); nextType != jsoniter.ObjectValue {
+		return errors.New("not a json object")
 	}
-	key = strings.TrimSpace(key)
-	if key[0] >= '0' && key[0] <= '9' {
-		key = "_" + key
+	_ = it.ReadMapCB(j.parseMap(""))
+	if it.Error != nil && it.Error != io.EOF {
+		return it.Error
 	}
-	return strings.Map(func(r rune) rune {
-		if (r >= 'a' && r <= 'z') || (r >= 'A' && r <= 'Z') || r == '_' || (r >= '0' && r <= '9') {
-			return r
+	return nil
+}
+
+func (j *JSONParser) parseMap(prefix string) func(iter *jsoniter.Iterator, field string) bool {
+	return func(iter *jsoniter.Iterator, field string) bool {
+		switch iter.WhatIsNext() {
+		// are we looking at a value that needs to be added ?
+		case jsoniter.StringValue, jsoniter.NumberValue, jsoniter.BoolValue:
+			j.parseLabelValue(iter, prefix, field)
+		// Or another new object based on a prefix.
+		case jsoniter.ObjectValue:
+			if key, ok := j.nextKeyPrefix(prefix, field); ok {
+				return iter.ReadMapCB(j.parseMap(key))
+			}
+			// If this keys is not expected we skip the object
+			iter.Skip()
+		default:
+			iter.Skip()
 		}
-		return '_'
-	}, key)
+		return true
+	}
 }
 
-type JSONParser struct{}
+func (j *JSONParser) nextKeyPrefix(prefix, field string) (string, bool) {
+	// first time time we add return the field as prefix.
+	if len(prefix) == 0 {
+		field = sanitizeLabelKey(field, true)
+		if isValidKeyPrefix(field, j.lbs.ParserLabelHints()) {
+			return field, true
+		}
+		return "", false
+	}
+	// otherwise we build the prefix and check using the buffer
+	j.buf = j.buf[:0]
+	j.buf = append(j.buf, prefix...)
+	j.buf = append(j.buf, byte(jsonSpacer))
+	j.buf = append(j.buf, sanitizeLabelKey(field, false)...)
+	// if matches keep going
+	if isValidKeyPrefix(string(j.buf), j.lbs.ParserLabelHints()) {
+		return string(j.buf), true
+	}
+	return "", false
 
-// NewJSONParser creates a log stage that can parse a json log line and add properties as labels.
-func NewJSONParser() *JSONParser {
-	return &JSONParser{}
 }
 
-func (j *JSONParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) {
-	data := map[string]interface{}{}
-	err := jsoniter.ConfigFastest.Unmarshal(line, &data)
-	if err != nil {
-		lbs.SetErr(errJSON)
-		return line, true
+// isValidKeyPrefix extract an object if the prefix is valid
+func isValidKeyPrefix(objectprefix string, hints []string) bool {
+	if len(hints) == 0 {
+		return true
 	}
-	parseMap("", data, lbs)
-	return line, true
+	for _, k := range hints {
+		if strings.HasPrefix(k, objectprefix) {
+			return true
+		}
+	}
+	return false
+}
+
+func (j *JSONParser) parseLabelValue(iter *jsoniter.Iterator, prefix, field string) {
+	// the first time we use the field as label key.
+	if len(prefix) == 0 {
+		field = sanitizeLabelKey(field, true)
+		if !shouldExtractKey(field, j.lbs.ParserLabelHints()) {
+			// we can skip the value
+			iter.Skip()
+			return
+
+		}
+		if j.lbs.BaseHas(field) {
+			field = field + duplicateSuffix
+		}
+		j.lbs.Set(field, readValue(iter))
+		return
+
+	}
+	// other we build the label key using the buffer
+	j.buf = j.buf[:0]
+	j.buf = append(j.buf, prefix...)
+	j.buf = append(j.buf, byte(jsonSpacer))
+	j.buf = append(j.buf, sanitizeLabelKey(field, false)...)
+	if j.lbs.BaseHas(string(j.buf)) {
+		j.buf = append(j.buf, duplicateSuffix...)
+	}
+	if !shouldExtractKey(string(j.buf), j.lbs.ParserLabelHints()) {
+		iter.Skip()
+		return
+	}
+	j.lbs.Set(string(j.buf), readValue(iter))
 }
 
-func parseMap(prefix string, data map[string]interface{}, lbs *LabelsBuilder) {
-	for key, val := range data {
-		switch concrete := val.(type) {
-		case map[string]interface{}:
-			parseMap(jsonKey(prefix, key), concrete, lbs)
-		case string:
-			addLabel(lbs, jsonKey(prefix, key), concrete)
-		case float64:
-			f := strconv.FormatFloat(concrete, 'f', -1, 64)
-			addLabel(lbs, jsonKey(prefix, key), f)
+func (j *JSONParser) RequiredLabelNames() []string { return []string{} }
+
+func readValue(iter *jsoniter.Iterator) string {
+	switch iter.WhatIsNext() {
+	case jsoniter.StringValue:
+		return iter.ReadString()
+	case jsoniter.NumberValue:
+		return iter.ReadNumber().String()
+	case jsoniter.BoolValue:
+		if iter.ReadBool() {
+			return trueString
 		}
+		return falseString
+	default:
+		iter.Skip()
+		return ""
 	}
 }
 
-func jsonKey(prefix, key string) string {
-	if prefix == "" {
-		return key
+func shouldExtractKey(key string, hints []string) bool {
+	if len(hints) == 0 {
+		return true
+	}
+	for _, k := range hints {
+		if k == key {
+			return true
+		}
+	}
+	return false
+}
+
+func addLabel(lbs *LabelsBuilder, key, value string) {
+	key = sanitizeLabelKey(key, true)
+	if lbs.BaseHas(key) {
+		key = fmt.Sprintf("%s%s", key, duplicateSuffix)
 	}
-	return fmt.Sprintf("%s%s%s", prefix, jsonSpacer, key)
+	lbs.Set(key, value)
 }
 
 type RegexpParser struct {
@@ -144,6 +248,8 @@ func (r *RegexpParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) {
 	return line, true
 }
 
+func (r *RegexpParser) RequiredLabelNames() []string { return []string{} }
+
 type LogfmtParser struct {
 	dec *logfmt.Decoder
 }
@@ -158,8 +264,10 @@ func NewLogfmtParser() *LogfmtParser {
 
 func (l *LogfmtParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) {
 	l.dec.Reset(line)
-
 	for l.dec.ScanKeyval() {
+		if !shouldExtractKey(string(l.dec.Key()), lbs.ParserLabelHints()) {
+			continue
+		}
 		key := string(l.dec.Key())
 		val := string(l.dec.Value())
 		addLabel(lbs, key, val)
@@ -170,3 +278,5 @@ func (l *LogfmtParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) {
 	}
 	return line, true
 }
+
+func (l *LogfmtParser) RequiredLabelNames() []string { return []string{} }
diff --git a/pkg/logql/log/parser_test.go b/pkg/logql/log/parser_test.go
index 659b7ce17a8a0..d2b44940725d6 100644
--- a/pkg/logql/log/parser_test.go
+++ b/pkg/logql/log/parser_test.go
@@ -61,7 +61,7 @@ func Test_jsonParser_Parse(t *testing.T) {
 		},
 		{
 			"duplicate extraction",
-			[]byte(`{"app":"foo","namespace":"prod","pod":{"uuid":"foo","deployment":{"ref":"foobar"}}}`),
+			[]byte(`{"app":"foo","namespace":"prod","pod":{"uuid":"foo","deployment":{"ref":"foobar"}},"next":{"err":false}}`),
 			labels.Labels{
 				{Name: "app", Value: "bar"},
 			},
@@ -70,6 +70,7 @@ func Test_jsonParser_Parse(t *testing.T) {
 				{Name: "app_extracted", Value: "foo"},
 				{Name: "namespace", Value: "prod"},
 				{Name: "pod_uuid", Value: "foo"},
+				{Name: "next_err", Value: "false"},
 				{Name: "pod_deployment_ref", Value: "foobar"},
 			},
 		},
@@ -86,6 +87,57 @@ func Test_jsonParser_Parse(t *testing.T) {
 	}
 }
 
+func Benchmark_Parser(b *testing.B) {
+
+	lbs := labels.Labels{
+		{Name: "cluster", Value: "qa-us-central1"},
+		{Name: "namespace", Value: "qa"},
+		{Name: "filename", Value: "/var/log/pods/ingress-nginx_nginx-ingress-controller-7745855568-blq6t_1f8962ef-f858-4188-a573-ba276a3cacc3/ingress-nginx/0.log"},
+		{Name: "job", Value: "ingress-nginx/nginx-ingress-controller"},
+		{Name: "name", Value: "nginx-ingress-controller"},
+		{Name: "pod", Value: "nginx-ingress-controller-7745855568-blq6t"},
+		{Name: "pod_template_hash", Value: "7745855568"},
+		{Name: "stream", Value: "stdout"},
+	}
+
+	jsonLine := `{"proxy_protocol_addr": "","remote_addr": "3.112.221.14","remote_user": "","upstream_addr": "10.12.15.234:5000","the_real_ip": "3.112.221.14","timestamp": "2020-12-11T16:20:07+00:00","protocol": "HTTP/1.1","upstream_name": "hosted-grafana-hosted-grafana-api-80","request": {"id": "c8eacb6053552c0cd1ae443bc660e140","time": "0.001","method" : "GET","host": "hg-api-qa-us-central1.grafana.net","uri": "/","size" : "128","user_agent": "worldping-api","referer": ""},"response": {"status": 200,"upstream_status": "200","size": "1155","size_sent": "265","latency_seconds": "0.001"}}`
+	logfmtLine := `level=info ts=2020-12-14T21:25:20.947307459Z caller=metrics.go:83 org_id=29 traceID=c80e691e8db08e2 latency=fast query="sum by (object_name) (rate(({container=\"metrictank\", cluster=\"hm-us-east2\"} |= \"PANIC\")[5m]))" query_type=metric range_type=range length=5m0s step=15s duration=322.623724ms status=200 throughput=1.2GB total_bytes=375MB`
+	nginxline := `10.1.0.88 - - [14/Dec/2020:22:56:24 +0000] "GET /static/img/about/bob.jpg HTTP/1.1" 200 60755 "https://grafana.com/go/observabilitycon/grafana-the-open-and-composable-observability-platform/?tech=ggl-o&pg=oss-graf&plcmt=hero-txt" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.1 Safari/605.1.15" "123.123.123.123, 35.35.122.223" "TLSv1.3"`
+
+	for _, tt := range []struct {
+		name            string
+		line            string
+		s               Stage
+		LabelParseHints []string //  hints to reduce label extractions.
+	}{
+		{"json", jsonLine, NewJSONParser(), []string{"response_latency_seconds"}},
+		{"logfmt", logfmtLine, NewLogfmtParser(), []string{"info", "throughput", "org_id"}},
+		{"regex greedy", nginxline, mustNewRegexParser(`GET (?P<path>.*?)/\?`), []string{"path"}},
+		{"regex status digits", nginxline, mustNewRegexParser(`HTTP/1.1" (?P<statuscode>\d{3}) `), []string{"statuscode"}},
+	} {
+		b.Run(tt.name, func(b *testing.B) {
+			line := []byte(tt.line)
+			b.Run("no labels hints", func(b *testing.B) {
+				builder := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash())
+				for n := 0; n < b.N; n++ {
+					builder.Reset()
+					_, _ = tt.s.Process(line, builder)
+				}
+			})
+
+			b.Run("labels hints", func(b *testing.B) {
+				builder := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash())
+				builder.parserKeyHints = tt.LabelParseHints
+				for n := 0; n < b.N; n++ {
+					builder.Reset()
+					_, _ = tt.s.Process(line, builder)
+				}
+			})
+		})
+	}
+
+}
+
 func TestNewRegexpParser(t *testing.T) {
 	tests := []struct {
 		name    string
@@ -289,24 +341,3 @@ func Test_logfmtParser_Parse(t *testing.T) {
 		})
 	}
 }
-
-func Test_sanitizeKey(t *testing.T) {
-	tests := []struct {
-		key  string
-		want string
-	}{
-		{"1", "_1"},
-		{"1 1 1", "_1_1_1"},
-		{"abc", "abc"},
-		{"$a$bc", "_a_bc"},
-		{"$a$bc", "_a_bc"},
-		{"   1 1 1  \t", "_1_1_1"},
-	}
-	for _, tt := range tests {
-		t.Run(tt.key, func(t *testing.T) {
-			if got := sanitizeKey(tt.key); got != tt.want {
-				t.Errorf("sanitizeKey() = %v, want %v", got, tt.want)
-			}
-		})
-	}
-}
diff --git a/pkg/logql/log/pipeline.go b/pkg/logql/log/pipeline.go
index 3dafcd77f9a87..874b3b540dd13 100644
--- a/pkg/logql/log/pipeline.go
+++ b/pkg/logql/log/pipeline.go
@@ -28,6 +28,7 @@ type StreamPipeline interface {
 // return the line unchanged or allocate a new line.
 type Stage interface {
 	Process(line []byte, lbs *LabelsBuilder) ([]byte, bool)
+	RequiredLabelNames() []string
 }
 
 // NewNoopPipeline creates a pipelines that does not process anything and returns log streams as is.
@@ -74,11 +75,22 @@ type noopStage struct{}
 func (noopStage) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) {
 	return line, true
 }
+func (noopStage) RequiredLabelNames() []string { return []string{} }
 
-type StageFunc func(line []byte, lbs *LabelsBuilder) ([]byte, bool)
+type StageFunc struct {
+	process        func(line []byte, lbs *LabelsBuilder) ([]byte, bool)
+	requiredLabels []string
+}
 
 func (fn StageFunc) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) {
-	return fn(line, lbs)
+	return fn.process(line, lbs)
+}
+
+func (fn StageFunc) RequiredLabelNames() []string {
+	if fn.requiredLabels == nil {
+		return []string{}
+	}
+	return fn.requiredLabels
 }
 
 // pipeline is a combinations of multiple stages.
@@ -147,16 +159,23 @@ func ReduceStages(stages []Stage) Stage {
 	if len(stages) == 0 {
 		return NoopStage
 	}
-	return StageFunc(func(line []byte, lbs *LabelsBuilder) ([]byte, bool) {
-		var ok bool
-		for _, p := range stages {
-			line, ok = p.Process(line, lbs)
-			if !ok {
-				return nil, false
+	var requiredLabelNames []string
+	for _, s := range stages {
+		requiredLabelNames = append(requiredLabelNames, s.RequiredLabelNames()...)
+	}
+	return StageFunc{
+		process: func(line []byte, lbs *LabelsBuilder) ([]byte, bool) {
+			var ok bool
+			for _, p := range stages {
+				line, ok = p.Process(line, lbs)
+				if !ok {
+					return nil, false
+				}
 			}
-		}
-		return line, true
-	})
+			return line, true
+		},
+		requiredLabels: requiredLabelNames,
+	}
 }
 
 func unsafeGetBytes(s string) []byte {
diff --git a/pkg/logql/log/util.go b/pkg/logql/log/util.go
new file mode 100644
index 0000000000000..d12ed21525f74
--- /dev/null
+++ b/pkg/logql/log/util.go
@@ -0,0 +1,35 @@
+package log
+
+import (
+	"strings"
+)
+
+func uniqueString(s []string) []string {
+	unique := make(map[string]bool, len(s))
+	us := make([]string, len(unique))
+	for _, elem := range s {
+		if len(elem) != 0 {
+			if !unique[elem] {
+				us = append(us, elem)
+				unique[elem] = true
+			}
+		}
+	}
+	return us
+}
+
+func sanitizeLabelKey(key string, isPrefix bool) string {
+	if len(key) == 0 {
+		return key
+	}
+	key = strings.TrimSpace(key)
+	if isPrefix && key[0] >= '0' && key[0] <= '9' {
+		key = "_" + key
+	}
+	return strings.Map(func(r rune) rune {
+		if (r >= 'a' && r <= 'z') || (r >= 'A' && r <= 'Z') || r == '_' || (r >= '0' && r <= '9') {
+			return r
+		}
+		return '_'
+	}, key)
+}
diff --git a/pkg/logql/log/util_test.go b/pkg/logql/log/util_test.go
new file mode 100644
index 0000000000000..b16172abf9e27
--- /dev/null
+++ b/pkg/logql/log/util_test.go
@@ -0,0 +1,24 @@
+package log
+
+import "testing"
+
+func Test_sanitizeLabelKey(t *testing.T) {
+	tests := []struct {
+		key  string
+		want string
+	}{
+		{"1", "_1"},
+		{"1 1 1", "_1_1_1"},
+		{"abc", "abc"},
+		{"$a$bc", "_a_bc"},
+		{"$a$bc", "_a_bc"},
+		{"   1 1 1  \t", "_1_1_1"},
+	}
+	for _, tt := range tests {
+		t.Run(tt.key, func(t *testing.T) {
+			if got := sanitizeLabelKey(tt.key, true); got != tt.want {
+				t.Errorf("sanitizeKey() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}

From 1c515221601b52c0258f5c6a59d9f5222540edd4 Mon Sep 17 00:00:00 2001
From: Cyril Tovena <cyril.tovena@gmail.com>
Date: Tue, 15 Dec 2020 10:31:52 +0100
Subject: [PATCH 2/3] typos.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
---
 pkg/logql/log/parser.go | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/pkg/logql/log/parser.go b/pkg/logql/log/parser.go
index caa83f518b7b3..b87ff665b829d 100644
--- a/pkg/logql/log/parser.go
+++ b/pkg/logql/log/parser.go
@@ -88,7 +88,7 @@ func (j *JSONParser) parseMap(prefix string) func(iter *jsoniter.Iterator, field
 }
 
 func (j *JSONParser) nextKeyPrefix(prefix, field string) (string, bool) {
-	// first time time we add return the field as prefix.
+	// first time we add return the field as prefix.
 	if len(prefix) == 0 {
 		field = sanitizeLabelKey(field, true)
 		if isValidKeyPrefix(field, j.lbs.ParserLabelHints()) {
@@ -139,7 +139,7 @@ func (j *JSONParser) parseLabelValue(iter *jsoniter.Iterator, prefix, field stri
 		return
 
 	}
-	// other we build the label key using the buffer
+	// otherwise we build the label key using the buffer
 	j.buf = j.buf[:0]
 	j.buf = append(j.buf, prefix...)
 	j.buf = append(j.buf, byte(jsonSpacer))

From 3713d3199a9c49a91003ac95ecbd57cd7972edc4 Mon Sep 17 00:00:00 2001
From: Cyril Tovena <cyril.tovena@gmail.com>
Date: Tue, 5 Jan 2021 09:38:42 +0100
Subject: [PATCH 3/3] Review feedback.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
---
 pkg/logql/log/labels.go             | 2 +-
 pkg/logql/log/metrics_extraction.go | 9 ++++++---
 pkg/logql/log/parser.go             | 2 +-
 3 files changed, 8 insertions(+), 5 deletions(-)

diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go
index e32dd8f870d3b..3d6d2bd833d23 100644
--- a/pkg/logql/log/labels.go
+++ b/pkg/logql/log/labels.go
@@ -131,7 +131,7 @@ func (b *LabelsBuilder) Reset() {
 	b.err = ""
 }
 
-// ExpectedLabels returns a limited list of expected labels to extract for metric queries.
+// ParserLabelHints returns a limited list of expected labels to extract for metric queries.
 // Returns nil when it's impossible to hint labels extractions.
 func (b *BaseLabelsBuilder) ParserLabelHints() []string {
 	return b.parserKeyHints
diff --git a/pkg/logql/log/metrics_extraction.go b/pkg/logql/log/metrics_extraction.go
index 861bd7b4df74e..f710b47a1824a 100644
--- a/pkg/logql/log/metrics_extraction.go
+++ b/pkg/logql/log/metrics_extraction.go
@@ -51,7 +51,8 @@ func NewLineSampleExtractor(ex LineExtractor, stages []Stage, groups []string, w
 	s := ReduceStages(stages)
 	var expectedLabels []string
 	if !without {
-		expectedLabels = uniqueString(append(s.RequiredLabelNames(), groups...))
+		expectedLabels = append(expectedLabels, s.RequiredLabelNames()...)
+		expectedLabels = uniqueString(append(expectedLabels, groups...))
 	}
 	return &lineSampleExtractor{
 		Stage:            s,
@@ -137,14 +138,16 @@ func LabelExtractorWithStages(
 		groups = append(groups, labelName)
 		sort.Strings(groups)
 	}
+	preStage := ReduceStages(preStages)
 	var expectedLabels []string
 	if !without {
-		expectedLabels = append(postFilter.RequiredLabelNames(), groups...)
+		expectedLabels = append(expectedLabels, preStage.RequiredLabelNames()...)
+		expectedLabels = append(expectedLabels, groups...)
 		expectedLabels = append(expectedLabels, postFilter.RequiredLabelNames()...)
 		expectedLabels = uniqueString(expectedLabels)
 	}
 	return &labelSampleExtractor{
-		preStage:         ReduceStages(preStages),
+		preStage:         preStage,
 		conversionFn:     convFn,
 		labelName:        labelName,
 		postFilter:       postFilter,
diff --git a/pkg/logql/log/parser.go b/pkg/logql/log/parser.go
index b87ff665b829d..47bd612c534f7 100644
--- a/pkg/logql/log/parser.go
+++ b/pkg/logql/log/parser.go
@@ -58,7 +58,7 @@ func (j *JSONParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) {
 func (j *JSONParser) readObject(it *jsoniter.Iterator) error {
 	// we only care about object and values.
 	if nextType := it.WhatIsNext(); nextType != jsoniter.ObjectValue {
-		return errors.New("not a json object")
+		return fmt.Errorf("expecting json object(%d), got %d", jsoniter.ObjectValue, nextType)
 	}
 	_ = it.ReadMapCB(j.parseMap(""))
 	if it.Error != nil && it.Error != io.EOF {