Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Intern label keys for LogQL parser. #3990

Merged
merged 3 commits into from
Jul 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions pkg/logql/log/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/grafana/loki/pkg/logqlmodel"
)

const MaxInternedStrings = 1024

var emptyLabelsResult = NewLabelsResult(labels.Labels{}, labels.Labels{}.Hash())

// LabelsResult is a computed labels result that contains the labels set with associated string and hash.
Expand Down Expand Up @@ -368,3 +370,24 @@ func (b *LabelsBuilder) toBaseGroup() LabelsResult {
b.groupedResult = res
return res
}

type internedStringSet map[string]struct {
s string
ok bool
}

func (i internedStringSet) Get(data []byte, createNew func() (string, bool)) (string, bool) {
s, ok := i[string(data)]
if ok {
return s.s, s.ok
}
new, ok := createNew()
if len(i) >= MaxInternedStrings {
return new, ok
}
i[string(data)] = struct {
s string
ok bool
}{s: new, ok: ok}
return new, ok
}
131 changes: 89 additions & 42 deletions pkg/logql/log/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,15 @@ var (
type JSONParser struct {
buf []byte // buffer used to build json keys
lbs *LabelsBuilder

keys internedStringSet
}

// NewJSONParser creates a log stage that can parse a json log line and add properties as labels.
func NewJSONParser() *JSONParser {
return &JSONParser{
buf: make([]byte, 0, 1024),
buf: make([]byte, 0, 1024),
keys: internedStringSet{},
}
}

Expand Down Expand Up @@ -110,7 +113,7 @@ func (j *JSONParser) nextKeyPrefix(prefix, field string) (string, bool) {
j.buf = append(j.buf, byte(jsonSpacer))
j.buf = append(j.buf, sanitizeLabelKey(field, false)...)
// if matches keep going
if j.lbs.ParserLabelHints().ShouldExtractPrefix(string(j.buf)) {
if j.lbs.ParserLabelHints().ShouldExtractPrefix(unsafeGetString(j.buf)) {
return string(j.buf), true
}
return "", false
Expand All @@ -119,17 +122,21 @@ func (j *JSONParser) nextKeyPrefix(prefix, field string) (string, bool) {
func (j *JSONParser) parseLabelValue(iter *jsoniter.Iterator, prefix, field string) {
// the first time we use the field as label key.
if len(prefix) == 0 {
field = sanitizeLabelKey(field, true)
if !j.lbs.ParserLabelHints().ShouldExtract(field) {
// we can skip the value
key, ok := j.keys.Get(unsafeGetBytes(field), func() (string, bool) {
field = sanitizeLabelKey(field, true)
if !j.lbs.ParserLabelHints().ShouldExtract(field) {
return "", false
}
if j.lbs.BaseHas(field) {
field = field + duplicateSuffix
}
return field, true
})
if !ok {
iter.Skip()
return

}
if j.lbs.BaseHas(field) {
field = field + duplicateSuffix
}
j.lbs.Set(field, readValue(iter))
j.lbs.Set(key, readValue(iter))
return

}
Expand All @@ -138,14 +145,20 @@ func (j *JSONParser) parseLabelValue(iter *jsoniter.Iterator, prefix, field stri
j.buf = append(j.buf, prefix...)
j.buf = append(j.buf, byte(jsonSpacer))
j.buf = append(j.buf, sanitizeLabelKey(field, false)...)
if j.lbs.BaseHas(string(j.buf)) {
j.buf = append(j.buf, duplicateSuffix...)
}
if !j.lbs.ParserLabelHints().ShouldExtract(string(j.buf)) {
key, ok := j.keys.Get(j.buf, func() (string, bool) {
if j.lbs.BaseHas(string(j.buf)) {
j.buf = append(j.buf, duplicateSuffix...)
}
if !j.lbs.ParserLabelHints().ShouldExtract(string(j.buf)) {
return "", false
}
return string(j.buf), true
})
if !ok {
iter.Skip()
return
}
j.lbs.Set(string(j.buf), readValue(iter))
j.lbs.Set(key, readValue(iter))
}

func (j *JSONParser) RequiredLabelNames() []string { return []string{} }
Expand All @@ -172,20 +185,11 @@ func readValue(iter *jsoniter.Iterator) string {
}
}

func addLabel(lbs *LabelsBuilder, key, value string) {
key = sanitizeLabelKey(key, true)
if len(key) == 0 {
return
}
if lbs.BaseHas(key) {
key = fmt.Sprintf("%s%s", key, duplicateSuffix)
}
lbs.Set(key, value)
}

type RegexpParser struct {
regex *regexp.Regexp
nameIndex map[int]string

keys internedStringSet
}

// NewRegexpParser creates a new log stage that can extract labels from a log line using a regex expression.
Expand Down Expand Up @@ -218,13 +222,27 @@ func NewRegexpParser(re string) (*RegexpParser, error) {
return &RegexpParser{
regex: regex,
nameIndex: nameIndex,
keys: internedStringSet{},
}, nil
}

func (r *RegexpParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) {
for i, value := range r.regex.FindSubmatch(line) {
if name, ok := r.nameIndex[i]; ok {
addLabel(lbs, name, string(value))
key, ok := r.keys.Get(unsafeGetBytes(name), func() (string, bool) {
sanitize := sanitizeLabelKey(name, true)
if len(sanitize) == 0 {
return "", false
}
if lbs.BaseHas(sanitize) {
sanitize = fmt.Sprintf("%s%s", sanitize, duplicateSuffix)
}
return sanitize, true
})
if !ok {
continue
}
lbs.Set(key, string(value))
}
}
return line, true
Expand All @@ -233,14 +251,16 @@ func (r *RegexpParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) {
func (r *RegexpParser) RequiredLabelNames() []string { return []string{} }

type LogfmtParser struct {
dec *logfmt.Decoder
dec *logfmt.Decoder
keys internedStringSet
}

// NewLogfmtParser creates a parser that can extract labels from a logfmt log line.
// Each keyval is extracted into a respective label.
func NewLogfmtParser() *LogfmtParser {
return &LogfmtParser{
dec: logfmt.NewDecoder(nil),
dec: logfmt.NewDecoder(nil),
keys: internedStringSet{},
}
}

Expand All @@ -250,16 +270,28 @@ func (l *LogfmtParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) {
}
l.dec.Reset(line)
for l.dec.ScanKeyval() {
if !lbs.ParserLabelHints().ShouldExtract(sanitizeLabelKey(string(l.dec.Key()), true)) {
key, ok := l.keys.Get(l.dec.Key(), func() (string, bool) {
sanitized := sanitizeLabelKey(string(l.dec.Key()), true)
if !lbs.ParserLabelHints().ShouldExtract(sanitized) {
return "", false
}
if len(sanitized) == 0 {
return "", false
}
if lbs.BaseHas(sanitized) {
sanitized = fmt.Sprintf("%s%s", sanitized, duplicateSuffix)
}
return sanitized, true
})
if !ok {
continue
}
key := string(l.dec.Key())
val := l.dec.Value()
// the rune error replacement is rejected by Prometheus, so we skip it.
if bytes.ContainsRune(val, utf8.RuneError) {
val = nil
}
addLabel(lbs, key, string(val))
lbs.Set(key, string(val))
}
if l.dec.Err() != nil {
lbs.SetErr(errLogfmt)
Expand Down Expand Up @@ -315,6 +347,8 @@ func (l *PatternParser) RequiredLabelNames() []string { return []string{} }

type JSONExpressionParser struct {
expressions map[string][]interface{}

keys internedStringSet
}

func NewJSONExpressionParser(expressions []JSONExpression) (*JSONExpressionParser, error) {
Expand All @@ -335,6 +369,7 @@ func NewJSONExpressionParser(expressions []JSONExpression) (*JSONExpressionParse

return &JSONExpressionParser{
expressions: paths,
keys: internedStringSet{},
}, nil
}

Expand All @@ -350,12 +385,14 @@ func (j *JSONExpressionParser) Process(line []byte, lbs *LabelsBuilder) ([]byte,

for identifier, paths := range j.expressions {
result := jsoniter.ConfigFastest.Get(line, paths...).ToString()
key, _ := j.keys.Get(unsafeGetBytes(identifier), func() (string, bool) {
if lbs.BaseHas(identifier) {
identifier = identifier + duplicateSuffix
}
return identifier, true
})

if lbs.BaseHas(identifier) {
identifier = identifier + duplicateSuffix
}

lbs.Set(identifier, result)
lbs.Set(key, result)
}

return line, true
Expand All @@ -365,6 +402,8 @@ func (j *JSONExpressionParser) RequiredLabelNames() []string { return []string{}

type UnpackParser struct {
lbsBuffer []string

keys internedStringSet
}

// NewUnpackParser creates a new unpack stage.
Expand All @@ -374,6 +413,7 @@ type UnpackParser struct {
func NewUnpackParser() *UnpackParser {
return &UnpackParser{
lbsBuffer: make([]string, 0, 16),
keys: internedStringSet{},
}
}

Expand Down Expand Up @@ -413,15 +453,22 @@ func (u *UnpackParser) unpack(it *jsoniter.Iterator, entry []byte, lbs *LabelsBu
isPacked = true
return true
}
if !lbs.ParserLabelHints().ShouldExtract(field) {
key, ok := u.keys.Get(unsafeGetBytes(field), func() (string, bool) {
if !lbs.ParserLabelHints().ShouldExtract(field) {
return "", false
}
if lbs.BaseHas(field) {
field = field + duplicateSuffix
}
return field, true
})
if !ok {
iter.Skip()
return true
}
if lbs.BaseHas(field) {
field = field + duplicateSuffix
}

// append to the buffer of labels
u.lbsBuffer = append(u.lbsBuffer, field, iter.ReadString())
u.lbsBuffer = append(u.lbsBuffer, key, iter.ReadString())
default:
iter.Skip()
}
Expand Down