Skip to content

Commit

Permalink
renames skip -> ok in ProcessString (#6064)
Browse files Browse the repository at this point in the history
* renames skip -> ok in ProcessString
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* streampipeline matches convention
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
  • Loading branch information
owen-d authored May 3, 2022
1 parent 8b15632 commit 93de7a7
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 52 deletions.
9 changes: 5 additions & 4 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -1010,12 +1010,13 @@ func (hb *headBlock) Iterator(ctx context.Context, direction logproto.Direction,
return
}
stats.AddHeadChunkBytes(int64(len(e.s)))
newLine, parsedLbs, ok := pipeline.ProcessString(e.t, e.s)
if !ok {
newLine, parsedLbs, matches := pipeline.ProcessString(e.t, e.s)
if !matches {
return
}
var stream *logproto.Stream
labels := parsedLbs.Labels().String()
var ok bool
if stream, ok = streams[labels]; !ok {
stream = &logproto.Stream{
Labels: labels,
Expand Down Expand Up @@ -1267,8 +1268,8 @@ func (e *entryBufferedIterator) StreamHash() uint64 { return e.pipeline.BaseLabe

func (e *entryBufferedIterator) Next() bool {
for e.bufferedIterator.Next() {
newLine, lbs, ok := e.pipeline.Process(e.currTs, e.currLine)
if !ok {
newLine, lbs, matches := e.pipeline.Process(e.currTs, e.currLine)
if !matches {
continue
}
e.cur.Timestamp = time.Unix(0, e.currTs)
Expand Down
5 changes: 3 additions & 2 deletions pkg/chunkenc/unordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,14 @@ func (hb *unorderedHeadBlock) Iterator(
mint,
maxt,
func(ts int64, line string) error {
newLine, parsedLbs, ok := pipeline.ProcessString(ts, line)
if !ok {
newLine, parsedLbs, matches := pipeline.ProcessString(ts, line)
if !matches {
return nil
}

var stream *logproto.Stream
labels := parsedLbs.String()
var ok bool
if stream, ok = streams[labels]; !ok {
stream = &logproto.Stream{
Labels: labels,
Expand Down
5 changes: 3 additions & 2 deletions pkg/logcli/client/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,14 @@ func newFileIterator(

processLine := func(line string) {
ts := time.Now()
parsedLine, parsedLabels, ok := pipeline.ProcessString(ts.UnixNano(), line)
if !ok {
parsedLine, parsedLabels, matches := pipeline.ProcessString(ts.UnixNano(), line)
if !matches {
return
}

var stream *logproto.Stream
lhash := parsedLabels.Hash()
var ok bool
if stream, ok = streams[lhash]; !ok {
stream = &logproto.Stream{
Labels: parsedLabels.String(),
Expand Down
8 changes: 4 additions & 4 deletions pkg/logql/log/metrics_extraction.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ func (sp *filteringStreamExtractor) Process(ts int64, line []byte) (float64, Lab
continue
}

_, _, skip := filter.pipeline.Process(ts, line)
if skip { //When the filter matches, don't run the next step
_, _, matches := filter.pipeline.Process(ts, line)
if matches { //When the filter matches, don't run the next step
return 0, nil, false
}
}
Expand All @@ -264,8 +264,8 @@ func (sp *filteringStreamExtractor) ProcessString(ts int64, line string) (float6
continue
}

_, _, skip := filter.pipeline.ProcessString(ts, line)
if skip { //When the filter matches, don't run the next step
_, _, matches := filter.pipeline.ProcessString(ts, line)
if matches { //When the filter matches, don't run the next step
return 0, nil, false
}
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/logql/log/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ type Pipeline interface {
// A StreamPipeline never mutate the received line.
type StreamPipeline interface {
BaseLabels() LabelsResult
Process(ts int64, line []byte) (resultLine []byte, resultLabels LabelsResult, skip bool)
ProcessString(ts int64, line string) (resultLine string, resultLabels LabelsResult, skip bool)
Process(ts int64, line []byte) (resultLine []byte, resultLabels LabelsResult, matches bool)
ProcessString(ts int64, line string) (resultLine string, resultLabels LabelsResult, matches bool)
}

// Stage is a single step of a Pipeline.
Expand Down Expand Up @@ -232,8 +232,8 @@ func (sp *filteringStreamPipeline) Process(ts int64, line []byte) ([]byte, Label
continue
}

_, _, skip := filter.pipeline.Process(ts, line)
if skip { // When the filter matches, don't run the next step
_, _, matches := filter.pipeline.Process(ts, line)
if matches { // When the filter matches, don't run the next step
return nil, nil, false
}
}
Expand All @@ -247,8 +247,8 @@ func (sp *filteringStreamPipeline) ProcessString(ts int64, line string) (string,
continue
}

_, _, skip := filter.pipeline.ProcessString(ts, line)
if skip { // When the filter matches, don't run the next step
_, _, matches := filter.pipeline.ProcessString(ts, line)
if matches { // When the filter matches, don't run the next step
return "", nil, false
}
}
Expand Down
54 changes: 27 additions & 27 deletions pkg/logql/log/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ import (

func TestNoopPipeline(t *testing.T) {
lbs := labels.Labels{{Name: "foo", Value: "bar"}}
l, lbr, ok := NewNoopPipeline().ForStream(lbs).Process(0, []byte(""))
l, lbr, matches := NewNoopPipeline().ForStream(lbs).Process(0, []byte(""))
require.Equal(t, []byte(""), l)
require.Equal(t, NewLabelsResult(lbs, lbs.Hash()), lbr)
require.Equal(t, true, ok)
require.Equal(t, true, matches)

ls, lbr, ok := NewNoopPipeline().ForStream(lbs).ProcessString(0, "")
ls, lbr, matches := NewNoopPipeline().ForStream(lbs).ProcessString(0, "")
require.Equal(t, "", ls)
require.Equal(t, NewLabelsResult(lbs, lbs.Hash()), lbr)
require.Equal(t, true, ok)
require.Equal(t, true, matches)
}

func TestPipeline(t *testing.T) {
Expand All @@ -29,25 +29,25 @@ func TestPipeline(t *testing.T) {
NewStringLabelFilter(labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")),
newMustLineFormatter("lbs {{.foo}}"),
})
l, lbr, ok := p.ForStream(lbs).Process(0, []byte("line"))
l, lbr, matches := p.ForStream(lbs).Process(0, []byte("line"))
require.Equal(t, []byte("lbs bar"), l)
require.Equal(t, NewLabelsResult(lbs, lbs.Hash()), lbr)
require.Equal(t, true, ok)
require.Equal(t, true, matches)

ls, lbr, ok := p.ForStream(lbs).ProcessString(0, "line")
ls, lbr, matches := p.ForStream(lbs).ProcessString(0, "line")
require.Equal(t, "lbs bar", ls)
require.Equal(t, NewLabelsResult(lbs, lbs.Hash()), lbr)
require.Equal(t, true, ok)
require.Equal(t, true, matches)

l, lbr, ok = p.ForStream(labels.Labels{}).Process(0, []byte("line"))
l, lbr, matches = p.ForStream(labels.Labels{}).Process(0, []byte("line"))
require.Equal(t, []byte(nil), l)
require.Equal(t, nil, lbr)
require.Equal(t, false, ok)
require.Equal(t, false, matches)

ls, lbr, ok = p.ForStream(labels.Labels{}).ProcessString(0, "line")
ls, lbr, matches = p.ForStream(labels.Labels{}).ProcessString(0, "line")
require.Equal(t, "", ls)
require.Equal(t, nil, lbr)
require.Equal(t, false, ok)
require.Equal(t, false, matches)
}

func TestFilteringPipeline(t *testing.T) {
Expand All @@ -74,11 +74,11 @@ func TestFilteringPipeline(t *testing.T) {

for _, test := range tt {
t.Run(test.name, func(t *testing.T) {
_, _, ok := p.ForStream(test.inputStreamLabels).Process(test.ts, []byte(test.line))
require.Equal(t, test.ok, ok)
_, _, matches := p.ForStream(test.inputStreamLabels).Process(test.ts, []byte(test.line))
require.Equal(t, test.ok, matches)

_, _, ok = p.ForStream(test.inputStreamLabels).ProcessString(test.ts, test.line)
require.Equal(t, test.ok, ok)
_, _, matches = p.ForStream(test.inputStreamLabels).ProcessString(test.ts, test.line)
require.Equal(t, test.ok, matches)
})
}
}
Expand Down Expand Up @@ -127,7 +127,7 @@ func (p *stubStreamPipeline) ProcessString(ts int64, line string) (string, Label
}

var (
resOK bool
resMatches bool
resLine []byte
resLineString string
resLbs LabelsResult
Expand Down Expand Up @@ -168,13 +168,13 @@ func Benchmark_Pipeline(b *testing.B) {
b.Run("pipeline bytes", func(b *testing.B) {
b.ResetTimer()
for n := 0; n < b.N; n++ {
resLine, resLbs, resOK = sp.Process(0, line)
resLine, resLbs, resMatches = sp.Process(0, line)
}
})
b.Run("pipeline string", func(b *testing.B) {
b.ResetTimer()
for n := 0; n < b.N; n++ {
resLineString, resLbs, resOK = sp.ProcessString(0, lineString)
resLineString, resLbs, resMatches = sp.ProcessString(0, lineString)
}
})

Expand All @@ -184,13 +184,13 @@ func Benchmark_Pipeline(b *testing.B) {
b.Run("line extractor bytes", func(b *testing.B) {
b.ResetTimer()
for n := 0; n < b.N; n++ {
resSample, resLbs, resOK = ex.Process(0, line)
resSample, resLbs, resMatches = ex.Process(0, line)
}
})
b.Run("line extractor string", func(b *testing.B) {
b.ResetTimer()
for n := 0; n < b.N; n++ {
resSample, resLbs, resOK = ex.ProcessString(0, lineString)
resSample, resLbs, resMatches = ex.ProcessString(0, lineString)
}
})

Expand All @@ -201,13 +201,13 @@ func Benchmark_Pipeline(b *testing.B) {
b.Run("label extractor bytes", func(b *testing.B) {
b.ResetTimer()
for n := 0; n < b.N; n++ {
resSample, resLbs, resOK = ex.Process(0, line)
resSample, resLbs, resMatches = ex.Process(0, line)
}
})
b.Run("label extractor string", func(b *testing.B) {
b.ResetTimer()
for n := 0; n < b.N; n++ {
resSample, resLbs, resOK = ex.ProcessString(0, lineString)
resSample, resLbs, resMatches = ex.ProcessString(0, lineString)
}
})
}
Expand Down Expand Up @@ -240,9 +240,9 @@ func jsonBenchmark(b *testing.B, parser Stage) {
b.ResetTimer()
sp := p.ForStream(lbs)
for n := 0; n < b.N; n++ {
resLine, resLbs, resOK = sp.Process(0, line)
resLine, resLbs, resMatches = sp.Process(0, line)

if !resOK {
if !resMatches {
b.Fatalf("resulting line not ok: %s\n", line)
}

Expand All @@ -263,9 +263,9 @@ func invalidJSONBenchmark(b *testing.B, parser Stage) {
b.ResetTimer()
sp := p.ForStream(labels.Labels{})
for n := 0; n < b.N; n++ {
resLine, resLbs, resOK = sp.Process(0, line)
resLine, resLbs, resMatches = sp.Process(0, line)

if !resOK {
if !resMatches {
b.Fatalf("resulting line not ok: %s\n", line)
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/logql/syntax/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,9 @@ func Test_NilFilterDoesntPanic(t *testing.T) {

p, err := expr.Pipeline()
require.Nil(t, err)
_, _, ok := p.ForStream(labelBar).Process(0, []byte("bleepbloop"))
_, _, matches := p.ForStream(labelBar).Process(0, []byte("bleepbloop"))

require.True(t, ok)
require.True(t, matches)
})
}
}
Expand Down Expand Up @@ -287,8 +287,8 @@ func Test_FilterMatcher(t *testing.T) {
} else {
sp := p.ForStream(labelBar)
for _, lc := range tt.lines {
_, _, ok := sp.Process(0, []byte(lc.l))
assert.Equalf(t, lc.e, ok, "query for line '%s' was %v and not %v", lc.l, ok, lc.e)
_, _, matches := sp.Process(0, []byte(lc.l))
assert.Equalf(t, lc.e, matches, "query for line '%s' was %v and not %v", lc.l, matches, lc.e)
}
}
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/logql/syntax/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3040,8 +3040,8 @@ func Test_PipelineCombined(t *testing.T) {
p, err := expr.Pipeline()
require.Nil(t, err)
sp := p.ForStream(labels.Labels{})
line, lbs, ok := sp.Process(0, []byte(`level=debug ts=2020-10-02T10:10:42.092268913Z caller=logging.go:66 traceID=a9d4d8a928d8db1 msg="POST /api/prom/api/v1/query_range (200) 1.5s"`))
require.True(t, ok)
line, lbs, matches := sp.Process(0, []byte(`level=debug ts=2020-10-02T10:10:42.092268913Z caller=logging.go:66 traceID=a9d4d8a928d8db1 msg="POST /api/prom/api/v1/query_range (200) 1.5s"`))
require.True(t, matches)
require.Equal(
t,
labels.Labels{labels.Label{Name: "caller", Value: "logging.go:66"}, labels.Label{Name: "duration", Value: "1.5s"}, labels.Label{Name: "level", Value: "debug"}, labels.Label{Name: "method", Value: "POST"}, labels.Label{Name: "msg", Value: "POST /api/prom/api/v1/query_range (200) 1.5s"}, labels.Label{Name: "path", Value: "/api/prom/api/v1/query_range"}, labels.Label{Name: "status", Value: "200"}, labels.Label{Name: "traceID", Value: "a9d4d8a928d8db1"}, labels.Label{Name: "ts", Value: "2020-10-02T10:10:42.092268913Z"}},
Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func processStream(in []logproto.Stream, pipeline log.Pipeline) []logproto.Strea
for _, stream := range in {
for _, e := range stream.Entries {
sp := pipeline.ForStream(mustParseLabels(stream.Labels))
if l, out, ok := sp.Process(e.Timestamp.UnixNano(), []byte(e.Line)); ok {
if l, out, matches := sp.Process(e.Timestamp.UnixNano(), []byte(e.Line)); matches {
var s *logproto.Stream
var found bool
s, found = resByStream[out.String()]
Expand Down

0 comments on commit 93de7a7

Please sign in to comment.