Skip to content

Commit

Permalink
Add a test to verify stats in the LogQL engine.
Browse files Browse the repository at this point in the history
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena committed Feb 3, 2020
1 parent 6a2b377 commit b14fe60
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 22 deletions.
10 changes: 10 additions & 0 deletions pkg/iter/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@ type EntryIterator interface {
Close() error
}

type noOpIterator struct{}

var NoopIterator = noOpIterator{}

func (noOpIterator) Next() bool { return false }
func (noOpIterator) Error() error { return nil }
func (noOpIterator) Labels() string { return "" }
func (noOpIterator) Entry() logproto.Entry { return logproto.Entry{} }
func (noOpIterator) Close() error { return nil }

// streamIterator iterates over entries in a stream.
type streamIterator struct {
i int
Expand Down
46 changes: 24 additions & 22 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,19 +107,33 @@ type query struct {

// Exec Implements `Query`
func (q *query) Exec(ctx context.Context) (Result, error) {
log, ctx := spanlogger.New(ctx, "Engine.Exec")
defer log.Finish()

var queryType string

if IsInstant(q) {
queryType = "instant"
} else {
queryType = "range"
}

timer := prometheus.NewTimer(queryTime.WithLabelValues(queryType))
defer timer.ObserveDuration()
data, statistics, err := q.ng.exec(ctx, q)

// records query statistics
var statResult stats.Result
start := time.Now()
ctx = stats.NewContext(ctx)

data, err := q.ng.exec(ctx, q)

statResult = stats.Snapshot(ctx, time.Since(start))
statResult.Log(level.Debug(log))

return Result{
Data: data,
Statistics: statistics,
Statistics: statResult,
}, err
}

Expand Down Expand Up @@ -159,52 +173,40 @@ func (ng *engine) NewInstantQuery(
}
}

func (ng *engine) exec(ctx context.Context, q *query) (promql.Value, stats.Result, error) {
log, ctx := spanlogger.New(ctx, "Engine.exec")
defer log.Finish()

func (ng *engine) exec(ctx context.Context, q *query) (promql.Value, error) {
ctx, cancel := context.WithTimeout(ctx, ng.timeout)
defer cancel()

// records query statistics
var statResult stats.Result
ctx = stats.NewContext(ctx)
start := time.Now()
defer func() {
statResult = stats.Snapshot(ctx, time.Since(start))
statResult.Log(level.Debug(log))
}()

qs := q.String()
// This is a legacy query used for health checking. Not the best practice, but it works.
if qs == "1+1" {
if IsInstant(q) {
return promql.Vector{}, statResult, nil
return promql.Vector{}, nil
}
return promql.Matrix{}, statResult, nil
return promql.Matrix{}, nil
}

expr, err := ParseExpr(qs)
if err != nil {
return nil, statResult, err
return nil, err
}

switch e := expr.(type) {
case SampleExpr:
value, err := ng.evalSample(ctx, e, q)
return value, statResult, err
return value, err

case LogSelectorExpr:
iter, err := ng.evaluator.Iterator(ctx, e, q)
if err != nil {
return nil, statResult, err
return nil, err
}
defer helpers.LogError("closing iterator", iter.Close)
streams, err := readStreams(iter, q.limit)
return streams, statResult, err
return streams, err
}

return nil, statResult, nil
return nil, nil
}

// evalSample evaluate a sampleExpr
Expand Down
14 changes: 14 additions & 0 deletions pkg/logql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (

"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/stats"
json "github.com/json-iterator/go"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var testSize = int64(300)
Expand Down Expand Up @@ -691,6 +693,18 @@ func TestEngine_NewRangeQuery(t *testing.T) {
}
}

func TestEngine_Stats(t *testing.T) {
eng := NewEngine(EngineOpts{}, QuerierFunc(func(ctx context.Context, sp SelectParams) (iter.EntryIterator, error) {
st := stats.GetChunkData(ctx)
st.DecompressedBytes++
return iter.NoopIterator, nil
}))
q := eng.NewInstantQuery(`{foo="bar"}`, time.Now(), logproto.BACKWARD, 1000)
r, err := q.Exec(context.Background())
require.NoError(t, err)
require.Equal(t, int64(1), r.Statistics.Store.DecompressedBytes)
}

// go test -mod=vendor ./pkg/logql/ -bench=. -benchmem -memprofile memprofile.out -cpuprofile cpuprofile.out
func BenchmarkRangeQuery100000(b *testing.B) {
benchmarkRangeQuery(int64(100000), b)
Expand Down

0 comments on commit b14fe60

Please sign in to comment.