Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metadata to labels result and filtering support #9702

Merged
merged 22 commits into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
e7d0535
Add metadata labels to LabelResults and allow filtering
salvacorts Jun 13, 2023
c370b5c
support metric queries
salvacorts Jun 26, 2023
3821cbd
Fix test - now that mettadata is added to the labelResults, the itera…
salvacorts Jun 27, 2023
790aa6b
Fix lint issues
salvacorts Jun 27, 2023
60f04a7
Minor fixes after rebase
salvacorts Jul 18, 2023
321cf84
Comment-out integration testn and fix another test
salvacorts Jul 18, 2023
fb6bbd3
Add more tests
salvacorts Jul 18, 2023
1032a62
Rename argument
salvacorts Jul 18, 2023
c781c75
Add test for memchunk iter and pipeline
salvacorts Jul 18, 2023
3bb02da
Imprtove test to check for resulting labels
salvacorts Jul 19, 2023
6d457a5
Fix bug in moveNext wrt EOF not being correctly handled
salvacorts Jul 19, 2023
5716549
Remove integration test since it's now implemented as unit-test
salvacorts Jul 19, 2023
f3cc48e
Fix lint issues
salvacorts Jul 19, 2023
7370d07
Test metric queries with sample iterator
salvacorts Jul 19, 2023
920ec9a
More lint fixes
salvacorts Jul 19, 2023
b410b86
Fix lint issues (finally?)
salvacorts Jul 19, 2023
1e6f49b
Remove uneeded code after rebase
salvacorts Jul 19, 2023
cc6ce91
Merge branch 'main' into salvacorts/metadata-to-labelsResult-and-filt…
salvacorts Jul 20, 2023
ec685d4
Add more tests for metrics extractor
salvacorts Jul 20, 2023
b1f4b36
Merge branch 'main' into salvacorts/metadata-to-labelsResult-and-filt…
salvacorts Jul 21, 2023
37e7d8c
Improvements after merge
salvacorts Jul 21, 2023
ce51e4c
Update pkg/logql/log/pipeline.go
salvacorts Jul 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 23 additions & 23 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,7 @@ func (hb *headBlock) Iterator(ctx context.Context, direction logproto.Direction,
return
}
stats.AddHeadChunkBytes(int64(len(e.s)))
newLine, parsedLbs, matches := pipeline.ProcessString(e.t, e.s)
newLine, parsedLbs, matches := pipeline.ProcessString(e.t, e.s, e.nonIndexedLabels...)
if !matches {
return
}
Expand Down Expand Up @@ -1077,7 +1077,7 @@ func (hb *headBlock) SampleIterator(ctx context.Context, mint, maxt int64, extra

for _, e := range hb.entries {
stats.AddHeadChunkBytes(int64(len(e.s)))
value, parsedLabels, ok := extractor.ProcessString(e.t, e.s)
value, parsedLabels, ok := extractor.ProcessString(e.t, e.s, e.nonIndexedLabels...)
if !ok {
continue
}
Expand Down Expand Up @@ -1144,8 +1144,8 @@ type bufferedIterator struct {
currLine []byte // the current line, this is the same as the buffer but sliced the line size.
currTs int64

metaLabelsBuf [][]byte // The buffer for a single entry's metadata labels.
currMetadataLabels [][]byte // The current labels.
metaLabelsBuf [][]byte // The buffer for a single entry's metadata labels.
currMetadataLabels labels.Labels // The current labels.

closed bool
}
Expand Down Expand Up @@ -1177,15 +1177,29 @@ func (si *bufferedIterator) Next() bool {
}
}

ts, line, metaLabels, ok := si.moveNext()
ts, line, nonIndexedLabelsBuff, ok := si.moveNext()
if !ok {
si.Close()
return false
}

var nonIndexedLabels labels.Labels
if len(nonIndexedLabelsBuff) > 0 {
if len(nonIndexedLabelsBuff)%2 != 0 {
si.err = fmt.Errorf("expected even number of metadata labels, got %d", len(nonIndexedLabelsBuff))
return false
}

nonIndexedLabels = make(labels.Labels, len(nonIndexedLabelsBuff)/2)
for i := 0; i < len(nonIndexedLabelsBuff); i += 2 {
nonIndexedLabels[i/2].Name = string(nonIndexedLabelsBuff[i])
nonIndexedLabels[i/2].Value = string(nonIndexedLabelsBuff[i+1])
}
}

si.currTs = ts
si.currLine = line
si.currMetadataLabels = metaLabels
si.currMetadataLabels = nonIndexedLabels
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please rename currMetadataLabels to currNonIndexedLabels. maybe in a separate PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a separate PR in the oven renaming all those "metadata" references.

return true
}

Expand Down Expand Up @@ -1452,28 +1466,14 @@ func (e *entryBufferedIterator) StreamHash() uint64 { return e.pipeline.BaseLabe

func (e *entryBufferedIterator) Next() bool {
for e.bufferedIterator.Next() {
if len(e.currMetadataLabels)%2 != 0 {
e.err = fmt.Errorf("expected even number of metadata labels, got %d", len(e.currMetadataLabels))
return false
}

var nonIndexedLabels []logproto.LabelAdapter
if len(e.currMetadataLabels) > 0 {
nonIndexedLabels = make([]logproto.LabelAdapter, len(e.currMetadataLabels)/2)
for i := 0; i < len(e.currMetadataLabels); i += 2 {
nonIndexedLabels[i/2].Name = string(e.currMetadataLabels[i])
nonIndexedLabels[i/2].Value = string(e.currMetadataLabels[i+1])
}
}

newLine, lbs, matches := e.pipeline.Process(e.currTs, e.currLine)
newLine, lbs, matches := e.pipeline.Process(e.currTs, e.currLine, e.currMetadataLabels...)
if !matches {
continue
}

e.stats.AddPostFilterLines(1)
e.currLabels = lbs
e.cur.NonIndexedLabels = nonIndexedLabels
e.cur.NonIndexedLabels = logproto.FromLabelsToLabelAdapters(e.currMetadataLabels)
e.cur.Timestamp = time.Unix(0, e.currTs)
e.cur.Line = string(newLine)
return true
Expand All @@ -1500,7 +1500,7 @@ type sampleBufferedIterator struct {

func (e *sampleBufferedIterator) Next() bool {
for e.bufferedIterator.Next() {
val, labels, ok := e.extractor.Process(e.currTs, e.currLine)
val, labels, ok := e.extractor.Process(e.currTs, e.currLine, e.currMetadataLabels...)
if !ok {
continue
}
Expand Down
211 changes: 177 additions & 34 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,6 @@ func TestBlock(t *testing.T) {
{
ts: 8,
str: "hello, worl\nd8!",
lbs: []logproto.LabelAdapter{
{Name: "a", Value: "a2"},
{Name: "b", Value: "b"},
},
},
{
ts: 8,
Expand All @@ -158,6 +154,14 @@ func TestBlock(t *testing.T) {
ts: 9,
str: "",
},
{
ts: 10,
str: "hello, world10!",
lbs: []logproto.LabelAdapter{
{Name: "a", Value: "a2"},
{Name: "b", Value: "b"},
},
},
}

for _, c := range cases {
Expand Down Expand Up @@ -187,7 +191,6 @@ func TestBlock(t *testing.T) {
require.NoError(t, it.Close())
require.Equal(t, len(cases), idx)

// TODO: Test labels and metadata labels here.
sampleIt := chk.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), countExtractor)
idx = 0
for sampleIt.Next() {
Expand Down Expand Up @@ -770,10 +773,10 @@ func BenchmarkWrite(b *testing.B) {
type nomatchPipeline struct{}

func (nomatchPipeline) BaseLabels() log.LabelsResult { return log.EmptyLabelsResult }
func (nomatchPipeline) Process(_ int64, line []byte) ([]byte, log.LabelsResult, bool) {
func (nomatchPipeline) Process(_ int64, line []byte, _ ...labels.Label) ([]byte, log.LabelsResult, bool) {
return line, nil, false
}
func (nomatchPipeline) ProcessString(_ int64, line string) (string, log.LabelsResult, bool) {
func (nomatchPipeline) ProcessString(_ int64, line string, _ ...labels.Label) (string, log.LabelsResult, bool) {
return line, nil, false
}

Expand Down Expand Up @@ -1547,6 +1550,9 @@ func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) {
for _, enc := range testEncoding {
enc := enc
t.Run(enc.String(), func(t *testing.T) {
streamLabels := labels.Labels{
{Name: "job", Value: "fake"},
}
chk := newMemChunkWithFormat(chunkFormatV4, enc, UnorderedWithMetadataHeadBlockFmt, testBlockSize, testTargetSize)
require.NoError(t, chk.Append(logprotoEntryWithMetadata(1, "lineA", []logproto.LabelAdapter{
{Name: "traceID", Value: "123"},
Expand All @@ -1566,14 +1572,6 @@ func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) {
{Name: "user", Value: "d"},
})))

expectedLines := []string{"lineA", "lineB", "lineC", "lineD"}
expectedStreams := []string{
labels.FromStrings("traceID", "123", "user", "a").String(),
labels.FromStrings("traceID", "456", "user", "b").String(),
labels.FromStrings("traceID", "789", "user", "c").String(),
labels.FromStrings("traceID", "123", "user", "d").String(),
}

// The expected bytes is the sum of bytes decompressed and bytes read from the head chunk.
// First we add the bytes read from the store (aka decompressed). That's
// nonIndexedLabelsBytes = n. lines * (n. labels <int> + (2 * n. labels) * (label length <int> + label))
Expand All @@ -1588,28 +1586,173 @@ func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) {
// Finally, the expected total bytes is the line bytes + non-indexed labels bytes
expectedBytes := lineBytes + expectedNonIndexedLabelsBytes

// We will run the test twice so the iterator will be created twice.
// This is to ensure that the iterator is correctly closed.
for i := 0; i < 2; i++ {
sts, ctx := stats.NewContext(context.Background())
for _, tc := range []struct {
name string
query string
expectedLines []string
expectedStreams []string
}{
{
name: "no-filter",
query: `{job="fake"}`,
expectedLines: []string{"lineA", "lineB", "lineC", "lineD"},
expectedStreams: []string{
labels.FromStrings("job", "fake", "traceID", "123", "user", "a").String(),
labels.FromStrings("job", "fake", "traceID", "456", "user", "b").String(),
labels.FromStrings("job", "fake", "traceID", "789", "user", "c").String(),
labels.FromStrings("job", "fake", "traceID", "123", "user", "d").String(),
},
},
{
name: "filter",
query: `{job="fake"} | traceID="789"`,
expectedLines: []string{"lineC"},
expectedStreams: []string{
labels.FromStrings("job", "fake", "traceID", "789", "user", "c").String(),
},
},
{
name: "filter-regex-or",
query: `{job="fake"} | traceID=~"456|789"`,
expectedLines: []string{"lineB", "lineC"},
expectedStreams: []string{
labels.FromStrings("job", "fake", "traceID", "456", "user", "b").String(),
labels.FromStrings("job", "fake", "traceID", "789", "user", "c").String(),
},
},
{
name: "filter-regex-contains",
query: `{job="fake"} | traceID=~".*5.*"`,
expectedLines: []string{"lineB"},
expectedStreams: []string{
labels.FromStrings("job", "fake", "traceID", "456", "user", "b").String(),
},
},
{
name: "filter-regex-complex",
query: `{job="fake"} | traceID=~"^[0-9]2.*"`,
expectedLines: []string{"lineA", "lineD"},
expectedStreams: []string{
labels.FromStrings("job", "fake", "traceID", "123", "user", "a").String(),
labels.FromStrings("job", "fake", "traceID", "123", "user", "d").String(),
},
},
{
name: "multiple-filters",
query: `{job="fake"} | traceID="123" | user="d"`,
expectedLines: []string{"lineD"},
expectedStreams: []string{
labels.FromStrings("job", "fake", "traceID", "123", "user", "d").String(),
},
},
{
name: "metadata-and-keep",
query: `{job="fake"} | keep job, user`,
expectedLines: []string{"lineA", "lineB", "lineC", "lineD"},
expectedStreams: []string{
labels.FromStrings("job", "fake", "user", "a").String(),
labels.FromStrings("job", "fake", "user", "b").String(),
labels.FromStrings("job", "fake", "user", "c").String(),
labels.FromStrings("job", "fake", "user", "d").String(),
},
},
{
name: "metadata-and-keep-filter",
query: `{job="fake"} | keep job, user="b"`,
expectedLines: []string{"lineA", "lineB", "lineC", "lineD"},
expectedStreams: []string{
labels.FromStrings("job", "fake").String(),
labels.FromStrings("job", "fake", "user", "b").String(),
labels.FromStrings("job", "fake").String(),
labels.FromStrings("job", "fake").String(),
},
},
{
name: "metadata-and-drop",
query: `{job="fake"} | drop traceID`,
expectedLines: []string{"lineA", "lineB", "lineC", "lineD"},
expectedStreams: []string{
labels.FromStrings("job", "fake", "user", "a").String(),
labels.FromStrings("job", "fake", "user", "b").String(),
labels.FromStrings("job", "fake", "user", "c").String(),
labels.FromStrings("job", "fake", "user", "d").String(),
},
},
{
name: "metadata-and-drop-filter",
query: `{job="fake"} | drop traceID="123"`,
expectedLines: []string{"lineA", "lineB", "lineC", "lineD"},
expectedStreams: []string{
labels.FromStrings("job", "fake", "user", "a").String(),
labels.FromStrings("job", "fake", "traceID", "456", "user", "b").String(),
labels.FromStrings("job", "fake", "traceID", "789", "user", "c").String(),
labels.FromStrings("job", "fake", "user", "d").String(),
},
},
} {
t.Run(tc.name, func(t *testing.T) {
t.Run("log", func(t *testing.T) {
expr, err := syntax.ParseLogSelector(tc.query, true)
require.NoError(t, err)

pipeline, err := expr.Pipeline()
require.NoError(t, err)

// We will run the test twice so the iterator will be created twice.
// This is to ensure that the iterator is correctly closed.
for i := 0; i < 2; i++ {
sts, ctx := stats.NewContext(context.Background())
it, err := chk.Iterator(ctx, time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, pipeline.ForStream(streamLabels))
require.NoError(t, err)

var lines []string
var streams []string
for it.Next() {
require.NoError(t, it.Error())
e := it.Entry()
lines = append(lines, e.Line)
streams = append(streams, it.Labels())
}
assert.ElementsMatch(t, tc.expectedLines, lines)
assert.ElementsMatch(t, tc.expectedStreams, streams)

resultStats := sts.Result(0, 0, len(lines))
require.Equal(t, int64(expectedBytes), resultStats.Summary.TotalBytesProcessed)
require.Equal(t, int64(expectedNonIndexedLabelsBytes), resultStats.Summary.TotalNonIndexedLabelsBytesProcessed)
}
})

it, err := chk.Iterator(ctx, time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline)
require.NoError(t, err)
t.Run("metric", func(t *testing.T) {
query := fmt.Sprintf(`count_over_time(%s [1d])`, tc.query)
expr, err := syntax.ParseSampleExpr(query)
require.NoError(t, err)

var lines []string
var streams []string
for it.Next() {
require.NoError(t, it.Error())
e := it.Entry()
lines = append(lines, e.Line)
streams = append(streams, logproto.FromLabelAdaptersToLabels(e.NonIndexedLabels).String())
}
assert.ElementsMatch(t, expectedLines, lines)
assert.ElementsMatch(t, expectedStreams, streams)
extractor, err := expr.Extractor()
require.NoError(t, err)

resultStats := sts.Result(0, 0, len(lines))
require.Equal(t, int64(expectedBytes), resultStats.Summary.TotalBytesProcessed)
require.Equal(t, int64(expectedNonIndexedLabelsBytes), resultStats.Summary.TotalNonIndexedLabelsBytesProcessed)
// We will run the test twice so the iterator will be created twice.
// This is to ensure that the iterator is correctly closed.
for i := 0; i < 2; i++ {
sts, ctx := stats.NewContext(context.Background())
it := chk.SampleIterator(ctx, time.Unix(0, 0), time.Unix(0, math.MaxInt64), extractor.ForStream(streamLabels))

var sumValues int
var streams []string
for it.Next() {
require.NoError(t, it.Error())
e := it.Sample()
sumValues += int(e.Value)
streams = append(streams, it.Labels())
}
require.Equal(t, len(tc.expectedLines), sumValues)
assert.ElementsMatch(t, tc.expectedStreams, streams)

resultStats := sts.Result(0, 0, 0)
require.Equal(t, int64(expectedBytes), resultStats.Summary.TotalBytesProcessed)
require.Equal(t, int64(expectedNonIndexedLabelsBytes), resultStats.Summary.TotalNonIndexedLabelsBytesProcessed)
}
})
})
}
})
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/chunkenc/unordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (hb *unorderedHeadBlock) Iterator(
mint,
maxt,
func(statsCtx *stats.Context, ts int64, line string, nonIndexedLabels labels.Labels) error {
newLine, parsedLbs, matches := pipeline.ProcessString(ts, line)
newLine, parsedLbs, matches := pipeline.ProcessString(ts, line, nonIndexedLabels...)
if !matches {
return nil
}
Expand Down Expand Up @@ -313,7 +313,7 @@ func (hb *unorderedHeadBlock) SampleIterator(
mint,
maxt,
func(statsCtx *stats.Context, ts int64, line string, metaLabels labels.Labels) error {
value, parsedLabels, ok := extractor.ProcessString(ts, line)
value, parsedLabels, ok := extractor.ProcessString(ts, line, metaLabels...)
if !ok {
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/chunkenc/unordered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,10 @@ func Test_Unordered_InsertRetrieval(t *testing.T) {
{
desc: "ts collision forward",
input: []entry{
{0, "a", nil}, {0, "b", labels.Labels{{Name: "a", Value: "b"}}}, {1, "c", nil},
{0, "a", labels.Labels{{Name: "a", Value: "b"}}}, {0, "b", labels.Labels{{Name: "a", Value: "b"}}}, {1, "c", nil},
},
exp: []entry{
{0, "a", nil}, {0, "b", labels.Labels{{Name: "a", Value: "b"}}}, {1, "c", nil},
{0, "a", labels.Labels{{Name: "a", Value: "b"}}}, {0, "b", labels.Labels{{Name: "a", Value: "b"}}}, {1, "c", nil},
},
},
{
Expand Down
Loading