Skip to content

Commit

Permalink
Metadata to labels result and filtering support (#9702)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
In #9700, we support encoding and decoding metadata for each entry into
the chunks. This PR adds support for returning metadata labels for
matching entries in a query to the returned LabelResults. It also
supports filtering out logs by metadata labels.

**Special notes for your reviewer**:

**Checklist**
- [x] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [x] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/upgrading/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e)

---------

Co-authored-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
  • Loading branch information
salvacorts and sandeepsukhani authored Jul 24, 2023
1 parent ab3d58a commit 1d04cd5
Show file tree
Hide file tree
Showing 9 changed files with 507 additions and 137 deletions.
46 changes: 23 additions & 23 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,7 @@ func (hb *headBlock) Iterator(ctx context.Context, direction logproto.Direction,
return
}
stats.AddHeadChunkBytes(int64(len(e.s)))
newLine, parsedLbs, matches := pipeline.ProcessString(e.t, e.s)
newLine, parsedLbs, matches := pipeline.ProcessString(e.t, e.s, e.nonIndexedLabels...)
if !matches {
return
}
Expand Down Expand Up @@ -1077,7 +1077,7 @@ func (hb *headBlock) SampleIterator(ctx context.Context, mint, maxt int64, extra

for _, e := range hb.entries {
stats.AddHeadChunkBytes(int64(len(e.s)))
value, parsedLabels, ok := extractor.ProcessString(e.t, e.s)
value, parsedLabels, ok := extractor.ProcessString(e.t, e.s, e.nonIndexedLabels...)
if !ok {
continue
}
Expand Down Expand Up @@ -1144,8 +1144,8 @@ type bufferedIterator struct {
currLine []byte // the current line, this is the same as the buffer but sliced the line size.
currTs int64

metaLabelsBuf [][]byte // The buffer for a single entry's metadata labels.
currMetadataLabels [][]byte // The current labels.
metaLabelsBuf [][]byte // The buffer for a single entry's metadata labels.
currMetadataLabels labels.Labels // The current labels.

closed bool
}
Expand Down Expand Up @@ -1177,15 +1177,29 @@ func (si *bufferedIterator) Next() bool {
}
}

ts, line, metaLabels, ok := si.moveNext()
ts, line, nonIndexedLabelsBuff, ok := si.moveNext()
if !ok {
si.Close()
return false
}

var nonIndexedLabels labels.Labels
if len(nonIndexedLabelsBuff) > 0 {
if len(nonIndexedLabelsBuff)%2 != 0 {
si.err = fmt.Errorf("expected even number of metadata labels, got %d", len(nonIndexedLabelsBuff))
return false
}

nonIndexedLabels = make(labels.Labels, len(nonIndexedLabelsBuff)/2)
for i := 0; i < len(nonIndexedLabelsBuff); i += 2 {
nonIndexedLabels[i/2].Name = string(nonIndexedLabelsBuff[i])
nonIndexedLabels[i/2].Value = string(nonIndexedLabelsBuff[i+1])
}
}

si.currTs = ts
si.currLine = line
si.currMetadataLabels = metaLabels
si.currMetadataLabels = nonIndexedLabels
return true
}

Expand Down Expand Up @@ -1452,28 +1466,14 @@ func (e *entryBufferedIterator) StreamHash() uint64 { return e.pipeline.BaseLabe

func (e *entryBufferedIterator) Next() bool {
for e.bufferedIterator.Next() {
if len(e.currMetadataLabels)%2 != 0 {
e.err = fmt.Errorf("expected even number of metadata labels, got %d", len(e.currMetadataLabels))
return false
}

var nonIndexedLabels []logproto.LabelAdapter
if len(e.currMetadataLabels) > 0 {
nonIndexedLabels = make([]logproto.LabelAdapter, len(e.currMetadataLabels)/2)
for i := 0; i < len(e.currMetadataLabels); i += 2 {
nonIndexedLabels[i/2].Name = string(e.currMetadataLabels[i])
nonIndexedLabels[i/2].Value = string(e.currMetadataLabels[i+1])
}
}

newLine, lbs, matches := e.pipeline.Process(e.currTs, e.currLine)
newLine, lbs, matches := e.pipeline.Process(e.currTs, e.currLine, e.currMetadataLabels...)
if !matches {
continue
}

e.stats.AddPostFilterLines(1)
e.currLabels = lbs
e.cur.NonIndexedLabels = nonIndexedLabels
e.cur.NonIndexedLabels = logproto.FromLabelsToLabelAdapters(e.currMetadataLabels)
e.cur.Timestamp = time.Unix(0, e.currTs)
e.cur.Line = string(newLine)
return true
Expand All @@ -1500,7 +1500,7 @@ type sampleBufferedIterator struct {

func (e *sampleBufferedIterator) Next() bool {
for e.bufferedIterator.Next() {
val, labels, ok := e.extractor.Process(e.currTs, e.currLine)
val, labels, ok := e.extractor.Process(e.currTs, e.currLine, e.currMetadataLabels...)
if !ok {
continue
}
Expand Down
211 changes: 177 additions & 34 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,6 @@ func TestBlock(t *testing.T) {
{
ts: 8,
str: "hello, worl\nd8!",
lbs: []logproto.LabelAdapter{
{Name: "a", Value: "a2"},
{Name: "b", Value: "b"},
},
},
{
ts: 8,
Expand All @@ -158,6 +154,14 @@ func TestBlock(t *testing.T) {
ts: 9,
str: "",
},
{
ts: 10,
str: "hello, world10!",
lbs: []logproto.LabelAdapter{
{Name: "a", Value: "a2"},
{Name: "b", Value: "b"},
},
},
}

for _, c := range cases {
Expand Down Expand Up @@ -187,7 +191,6 @@ func TestBlock(t *testing.T) {
require.NoError(t, it.Close())
require.Equal(t, len(cases), idx)

// TODO: Test labels and metadata labels here.
sampleIt := chk.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), countExtractor)
idx = 0
for sampleIt.Next() {
Expand Down Expand Up @@ -770,10 +773,10 @@ func BenchmarkWrite(b *testing.B) {
type nomatchPipeline struct{}

func (nomatchPipeline) BaseLabels() log.LabelsResult { return log.EmptyLabelsResult }
func (nomatchPipeline) Process(_ int64, line []byte) ([]byte, log.LabelsResult, bool) {
func (nomatchPipeline) Process(_ int64, line []byte, _ ...labels.Label) ([]byte, log.LabelsResult, bool) {
return line, nil, false
}
func (nomatchPipeline) ProcessString(_ int64, line string) (string, log.LabelsResult, bool) {
func (nomatchPipeline) ProcessString(_ int64, line string, _ ...labels.Label) (string, log.LabelsResult, bool) {
return line, nil, false
}

Expand Down Expand Up @@ -1547,6 +1550,9 @@ func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) {
for _, enc := range testEncoding {
enc := enc
t.Run(enc.String(), func(t *testing.T) {
streamLabels := labels.Labels{
{Name: "job", Value: "fake"},
}
chk := newMemChunkWithFormat(chunkFormatV4, enc, UnorderedWithMetadataHeadBlockFmt, testBlockSize, testTargetSize)
require.NoError(t, chk.Append(logprotoEntryWithMetadata(1, "lineA", []logproto.LabelAdapter{
{Name: "traceID", Value: "123"},
Expand All @@ -1566,14 +1572,6 @@ func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) {
{Name: "user", Value: "d"},
})))

expectedLines := []string{"lineA", "lineB", "lineC", "lineD"}
expectedStreams := []string{
labels.FromStrings("traceID", "123", "user", "a").String(),
labels.FromStrings("traceID", "456", "user", "b").String(),
labels.FromStrings("traceID", "789", "user", "c").String(),
labels.FromStrings("traceID", "123", "user", "d").String(),
}

// The expected bytes is the sum of bytes decompressed and bytes read from the head chunk.
// First we add the bytes read from the store (aka decompressed). That's
// nonIndexedLabelsBytes = n. lines * (n. labels <int> + (2 * n. labels) * (label length <int> + label))
Expand All @@ -1588,28 +1586,173 @@ func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) {
// Finally, the expected total bytes is the line bytes + non-indexed labels bytes
expectedBytes := lineBytes + expectedNonIndexedLabelsBytes

// We will run the test twice so the iterator will be created twice.
// This is to ensure that the iterator is correctly closed.
for i := 0; i < 2; i++ {
sts, ctx := stats.NewContext(context.Background())
for _, tc := range []struct {
name string
query string
expectedLines []string
expectedStreams []string
}{
{
name: "no-filter",
query: `{job="fake"}`,
expectedLines: []string{"lineA", "lineB", "lineC", "lineD"},
expectedStreams: []string{
labels.FromStrings("job", "fake", "traceID", "123", "user", "a").String(),
labels.FromStrings("job", "fake", "traceID", "456", "user", "b").String(),
labels.FromStrings("job", "fake", "traceID", "789", "user", "c").String(),
labels.FromStrings("job", "fake", "traceID", "123", "user", "d").String(),
},
},
{
name: "filter",
query: `{job="fake"} | traceID="789"`,
expectedLines: []string{"lineC"},
expectedStreams: []string{
labels.FromStrings("job", "fake", "traceID", "789", "user", "c").String(),
},
},
{
name: "filter-regex-or",
query: `{job="fake"} | traceID=~"456|789"`,
expectedLines: []string{"lineB", "lineC"},
expectedStreams: []string{
labels.FromStrings("job", "fake", "traceID", "456", "user", "b").String(),
labels.FromStrings("job", "fake", "traceID", "789", "user", "c").String(),
},
},
{
name: "filter-regex-contains",
query: `{job="fake"} | traceID=~".*5.*"`,
expectedLines: []string{"lineB"},
expectedStreams: []string{
labels.FromStrings("job", "fake", "traceID", "456", "user", "b").String(),
},
},
{
name: "filter-regex-complex",
query: `{job="fake"} | traceID=~"^[0-9]2.*"`,
expectedLines: []string{"lineA", "lineD"},
expectedStreams: []string{
labels.FromStrings("job", "fake", "traceID", "123", "user", "a").String(),
labels.FromStrings("job", "fake", "traceID", "123", "user", "d").String(),
},
},
{
name: "multiple-filters",
query: `{job="fake"} | traceID="123" | user="d"`,
expectedLines: []string{"lineD"},
expectedStreams: []string{
labels.FromStrings("job", "fake", "traceID", "123", "user", "d").String(),
},
},
{
name: "metadata-and-keep",
query: `{job="fake"} | keep job, user`,
expectedLines: []string{"lineA", "lineB", "lineC", "lineD"},
expectedStreams: []string{
labels.FromStrings("job", "fake", "user", "a").String(),
labels.FromStrings("job", "fake", "user", "b").String(),
labels.FromStrings("job", "fake", "user", "c").String(),
labels.FromStrings("job", "fake", "user", "d").String(),
},
},
{
name: "metadata-and-keep-filter",
query: `{job="fake"} | keep job, user="b"`,
expectedLines: []string{"lineA", "lineB", "lineC", "lineD"},
expectedStreams: []string{
labels.FromStrings("job", "fake").String(),
labels.FromStrings("job", "fake", "user", "b").String(),
labels.FromStrings("job", "fake").String(),
labels.FromStrings("job", "fake").String(),
},
},
{
name: "metadata-and-drop",
query: `{job="fake"} | drop traceID`,
expectedLines: []string{"lineA", "lineB", "lineC", "lineD"},
expectedStreams: []string{
labels.FromStrings("job", "fake", "user", "a").String(),
labels.FromStrings("job", "fake", "user", "b").String(),
labels.FromStrings("job", "fake", "user", "c").String(),
labels.FromStrings("job", "fake", "user", "d").String(),
},
},
{
name: "metadata-and-drop-filter",
query: `{job="fake"} | drop traceID="123"`,
expectedLines: []string{"lineA", "lineB", "lineC", "lineD"},
expectedStreams: []string{
labels.FromStrings("job", "fake", "user", "a").String(),
labels.FromStrings("job", "fake", "traceID", "456", "user", "b").String(),
labels.FromStrings("job", "fake", "traceID", "789", "user", "c").String(),
labels.FromStrings("job", "fake", "user", "d").String(),
},
},
} {
t.Run(tc.name, func(t *testing.T) {
t.Run("log", func(t *testing.T) {
expr, err := syntax.ParseLogSelector(tc.query, true)
require.NoError(t, err)

pipeline, err := expr.Pipeline()
require.NoError(t, err)

// We will run the test twice so the iterator will be created twice.
// This is to ensure that the iterator is correctly closed.
for i := 0; i < 2; i++ {
sts, ctx := stats.NewContext(context.Background())
it, err := chk.Iterator(ctx, time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, pipeline.ForStream(streamLabels))
require.NoError(t, err)

var lines []string
var streams []string
for it.Next() {
require.NoError(t, it.Error())
e := it.Entry()
lines = append(lines, e.Line)
streams = append(streams, it.Labels())
}
assert.ElementsMatch(t, tc.expectedLines, lines)
assert.ElementsMatch(t, tc.expectedStreams, streams)

resultStats := sts.Result(0, 0, len(lines))
require.Equal(t, int64(expectedBytes), resultStats.Summary.TotalBytesProcessed)
require.Equal(t, int64(expectedNonIndexedLabelsBytes), resultStats.Summary.TotalNonIndexedLabelsBytesProcessed)
}
})

it, err := chk.Iterator(ctx, time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline)
require.NoError(t, err)
t.Run("metric", func(t *testing.T) {
query := fmt.Sprintf(`count_over_time(%s [1d])`, tc.query)
expr, err := syntax.ParseSampleExpr(query)
require.NoError(t, err)

var lines []string
var streams []string
for it.Next() {
require.NoError(t, it.Error())
e := it.Entry()
lines = append(lines, e.Line)
streams = append(streams, logproto.FromLabelAdaptersToLabels(e.NonIndexedLabels).String())
}
assert.ElementsMatch(t, expectedLines, lines)
assert.ElementsMatch(t, expectedStreams, streams)
extractor, err := expr.Extractor()
require.NoError(t, err)

resultStats := sts.Result(0, 0, len(lines))
require.Equal(t, int64(expectedBytes), resultStats.Summary.TotalBytesProcessed)
require.Equal(t, int64(expectedNonIndexedLabelsBytes), resultStats.Summary.TotalNonIndexedLabelsBytesProcessed)
// We will run the test twice so the iterator will be created twice.
// This is to ensure that the iterator is correctly closed.
for i := 0; i < 2; i++ {
sts, ctx := stats.NewContext(context.Background())
it := chk.SampleIterator(ctx, time.Unix(0, 0), time.Unix(0, math.MaxInt64), extractor.ForStream(streamLabels))

var sumValues int
var streams []string
for it.Next() {
require.NoError(t, it.Error())
e := it.Sample()
sumValues += int(e.Value)
streams = append(streams, it.Labels())
}
require.Equal(t, len(tc.expectedLines), sumValues)
assert.ElementsMatch(t, tc.expectedStreams, streams)

resultStats := sts.Result(0, 0, 0)
require.Equal(t, int64(expectedBytes), resultStats.Summary.TotalBytesProcessed)
require.Equal(t, int64(expectedNonIndexedLabelsBytes), resultStats.Summary.TotalNonIndexedLabelsBytesProcessed)
}
})
})
}
})
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/chunkenc/unordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (hb *unorderedHeadBlock) Iterator(
mint,
maxt,
func(statsCtx *stats.Context, ts int64, line string, nonIndexedLabels labels.Labels) error {
newLine, parsedLbs, matches := pipeline.ProcessString(ts, line)
newLine, parsedLbs, matches := pipeline.ProcessString(ts, line, nonIndexedLabels...)
if !matches {
return nil
}
Expand Down Expand Up @@ -313,7 +313,7 @@ func (hb *unorderedHeadBlock) SampleIterator(
mint,
maxt,
func(statsCtx *stats.Context, ts int64, line string, metaLabels labels.Labels) error {
value, parsedLabels, ok := extractor.ProcessString(ts, line)
value, parsedLabels, ok := extractor.ProcessString(ts, line, metaLabels...)
if !ok {
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/chunkenc/unordered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,10 @@ func Test_Unordered_InsertRetrieval(t *testing.T) {
{
desc: "ts collision forward",
input: []entry{
{0, "a", nil}, {0, "b", labels.Labels{{Name: "a", Value: "b"}}}, {1, "c", nil},
{0, "a", labels.Labels{{Name: "a", Value: "b"}}}, {0, "b", labels.Labels{{Name: "a", Value: "b"}}}, {1, "c", nil},
},
exp: []entry{
{0, "a", nil}, {0, "b", labels.Labels{{Name: "a", Value: "b"}}}, {1, "c", nil},
{0, "a", labels.Labels{{Name: "a", Value: "b"}}}, {0, "b", labels.Labels{{Name: "a", Value: "b"}}}, {1, "c", nil},
},
},
{
Expand Down
Loading

0 comments on commit 1d04cd5

Please sign in to comment.