Skip to content

Commit

Permalink
Add query enqueue time to metrics.go
Browse files Browse the repository at this point in the history
  • Loading branch information
ssncferreira committed Dec 30, 2021
1 parent 1042037 commit 3b11ac8
Show file tree
Hide file tree
Showing 23 changed files with 220 additions and 81 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## Main

* [4949](https://github.com/grafana/loki/pull/4949) **ssncferreira**: Add query `enqueueTime` metric to statistics and metrics.go
* [4993](https://github.com/grafana/loki/pull/4926) **thejosephstevens**: Fix parent of wal and wal_cleaner in loki ruler config docs
* [4933](https://github.com/grafana/loki/pull/4933) **jeschkies**: Support matchers in series label values query.
* [4926](https://github.com/grafana/loki/pull/4926) **thejosephstevens**: Fix comment in Loki module loading for accuracy
Expand Down
1 change: 1 addition & 0 deletions docs/sources/api/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,7 @@ The example belows show all possible statistics returned with their respective d
},
"summary": {
"bytesProcessedPerSecond": 0, // Total of bytes processed per second
"enqueueTime": 0, // Total enqueue time in seconds (float)
"execTime": 0, // Total execution time in seconds (float)
"linesProcessedPerSecond": 0, // Total lines processed per second
"totalBytesProcessed":0, // Total amount of bytes processed overall for this request
Expand Down
4 changes: 2 additions & 2 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ func TestChunkStats(t *testing.T) {
t.Fatal(err)
}
// test on a chunk filling up
s := statsCtx.Result(time.Since(first))
s := statsCtx.Result(time.Since(first), 0)
require.Equal(t, int64(expectedSize), s.Summary.TotalBytesProcessed)
require.Equal(t, int64(inserted), s.Summary.TotalLinesProcessed)

Expand All @@ -594,7 +594,7 @@ func TestChunkStats(t *testing.T) {
if err := it.Close(); err != nil {
t.Fatal(err)
}
s = statsCtx.Result(time.Since(first))
s = statsCtx.Result(time.Since(first), 0)
require.Equal(t, int64(expectedSize), s.Summary.TotalBytesProcessed)
require.Equal(t, int64(inserted), s.Summary.TotalLinesProcessed)

Expand Down
2 changes: 1 addition & 1 deletion pkg/iter/entry_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ func Test_DuplicateCount(t *testing.T) {
defer it.Close()
for it.Next() {
}
require.Equal(t, test.expectedDuplicates, stats.FromContext(ctx).Result(0).TotalDuplicates())
require.Equal(t, test.expectedDuplicates, stats.FromContext(ctx).Result(0, 0).TotalDuplicates())
})
}
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/httpreq"
)

var (
Expand Down Expand Up @@ -119,7 +120,9 @@ func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) {

data, err := q.Eval(ctx)

statResult := statsCtx.Result(time.Since(start))
enqueueTime, _ := ctx.Value(httpreq.QueryEnqueueTimeHTTPHeader).(time.Duration)

statResult := statsCtx.Result(time.Since(start), enqueueTime)
statResult.Log(level.Debug(log))

status := "200"
Expand Down
6 changes: 5 additions & 1 deletion pkg/logql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/httpreq"
)

var (
Expand Down Expand Up @@ -2010,16 +2011,19 @@ func (statsQuerier) SelectSamples(ctx context.Context, p SelectSampleParams) (it
func TestEngine_Stats(t *testing.T) {
eng := NewEngine(EngineOpts{}, &statsQuerier{}, NoLimits)

enqueueTime := 2 * time.Millisecond
q := eng.Query(LiteralParams{
qs: `{foo="bar"}`,
start: time.Now(),
end: time.Now(),
direction: logproto.BACKWARD,
limit: 1000,
})
r, err := q.Exec(user.InjectOrgID(context.Background(), "fake"))
ctx := context.WithValue(context.Background(), httpreq.QueryEnqueueTimeHTTPHeader, enqueueTime)
r, err := q.Exec(user.InjectOrgID(ctx, "fake"))
require.NoError(t, err)
require.Equal(t, int64(1), r.Statistics.TotalDecompressedBytes())
require.Equal(t, enqueueTime.Seconds(), r.Statistics.Summary.EnqueueTime)
}

type errorIteratorQuerier struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/logql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func RecordMetrics(ctx context.Context, p Params, status string, stats stats.Res
"returned_lines", returnedLines,
"throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1),
"total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1),
"enqueue_time", time.Duration(int64(stats.Summary.EnqueueTime * float64(time.Second))),
}...)

logValues = append(logValues, tagsToKeyValues(queryTags)...)
Expand Down
3 changes: 2 additions & 1 deletion pkg/logql/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,14 @@ func TestLogSlowQuery(t *testing.T) {
}, "200", stats.Result{
Summary: stats.Summary{
BytesProcessedPerSecond: 100000,
EnqueueTime: 0.2,
ExecTime: 25.25,
TotalBytesProcessed: 100000,
},
}, logqlmodel.Streams{logproto.Stream{Entries: make([]logproto.Entry, 10)}})
require.Equal(t,
fmt.Sprintf(
"level=info org_id=foo traceID=%s latency=slow query=\"{foo=\\\"bar\\\"} |= \\\"buzz\\\"\" query_type=filter range_type=range length=1h0m0s step=1m0s duration=25.25s status=200 limit=1000 returned_lines=10 throughput=100kB total_bytes=100kB source=logvolhist feature=beta\n",
"level=info org_id=foo traceID=%s latency=slow query=\"{foo=\\\"bar\\\"} |= \\\"buzz\\\"\" query_type=filter range_type=range length=1h0m0s step=1m0s duration=25.25s status=200 limit=1000 returned_lines=10 throughput=100kB total_bytes=100kB enqueue_time=200ms source=logvolhist feature=beta\n",
sp.Context().(jaeger.SpanContext).SpanID().String(),
),
buf.String())
Expand Down
13 changes: 9 additions & 4 deletions pkg/logqlmodel/stats/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (c *Context) Reset() {
}

// Result calculates the summary based on store and ingester data.
func (c *Context) Result(execTime time.Duration) Result {
func (c *Context) Result(execTime time.Duration, enqueueTime time.Duration) Result {
r := c.result

r.Merge(Result{
Expand All @@ -101,7 +101,7 @@ func (c *Context) Result(execTime time.Duration) Result {
Ingester: c.ingester,
})

r.ComputeSummary(execTime)
r.ComputeSummary(execTime, enqueueTime)

return r
}
Expand All @@ -125,7 +125,7 @@ func JoinIngesters(ctx context.Context, inc Ingester) {
}

// ComputeSummary compute the summary of the statistics.
func (r *Result) ComputeSummary(execTime time.Duration) {
func (r *Result) ComputeSummary(execTime time.Duration, enqueueTime time.Duration) {
r.Summary.TotalBytesProcessed = r.Querier.Store.Chunk.DecompressedBytes + r.Querier.Store.Chunk.HeadChunkBytes +
r.Ingester.Store.Chunk.DecompressedBytes + r.Ingester.Store.Chunk.HeadChunkBytes
r.Summary.TotalLinesProcessed = r.Querier.Store.Chunk.DecompressedLines + r.Querier.Store.Chunk.HeadChunkLines +
Expand All @@ -139,6 +139,9 @@ func (r *Result) ComputeSummary(execTime time.Duration) {
int64(float64(r.Summary.TotalLinesProcessed) /
execTime.Seconds())
}
if enqueueTime != 0 {
r.Summary.EnqueueTime = enqueueTime.Seconds()
}
}

func (s *Store) Merge(m Store) {
Expand Down Expand Up @@ -168,7 +171,8 @@ func (i *Ingester) Merge(m Ingester) {
func (r *Result) Merge(m Result) {
r.Querier.Merge(m.Querier)
r.Ingester.Merge(m.Ingester)
r.ComputeSummary(time.Duration(int64((r.Summary.ExecTime + m.Summary.ExecTime) * float64(time.Second))))
r.ComputeSummary(time.Duration(int64((r.Summary.ExecTime+m.Summary.ExecTime)*float64(time.Second))),
time.Duration(int64((r.Summary.EnqueueTime+m.Summary.EnqueueTime)*float64(time.Second))))
}

func (r Result) ChunksDownloadTime() time.Duration {
Expand Down Expand Up @@ -281,5 +285,6 @@ func (s Summary) Log(log log.Logger) {
"Summary.TotalBytesProcessed", humanize.Bytes(uint64(s.TotalBytesProcessed)),
"Summary.TotalLinesProcessed", s.TotalLinesProcessed,
"Summary.ExecTime", time.Duration(int64(s.ExecTime*float64(time.Second))),
"Summary.EnqueueTime", time.Duration(int64(s.EnqueueTime*float64(time.Second))),
)
}
12 changes: 8 additions & 4 deletions pkg/logqlmodel/stats/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestResult(t *testing.T) {
fakeIngesterQuery(ctx)
fakeIngesterQuery(ctx)

res := stats.Result(2 * time.Second)
res := stats.Result(2*time.Second, 2*time.Millisecond)
res.Log(util_log.Logger)
expected := Result{
Ingester: Ingester{
Expand Down Expand Up @@ -61,6 +61,7 @@ func TestResult(t *testing.T) {
},
Summary: Summary{
ExecTime: 2 * time.Second.Seconds(),
EnqueueTime: 2 * time.Millisecond.Seconds(),
BytesProcessedPerSecond: int64(42),
LinesProcessedPerSecond: int64(50),
TotalBytesProcessed: int64(84),
Expand Down Expand Up @@ -106,6 +107,7 @@ func TestSnapshot_JoinResults(t *testing.T) {
},
Summary: Summary{
ExecTime: 2 * time.Second.Seconds(),
EnqueueTime: 2 * time.Millisecond.Seconds(),
BytesProcessedPerSecond: int64(42),
LinesProcessedPerSecond: int64(50),
TotalBytesProcessed: int64(84),
Expand All @@ -114,7 +116,7 @@ func TestSnapshot_JoinResults(t *testing.T) {
}

JoinResults(ctx, expected)
res := statsCtx.Result(2 * time.Second)
res := statsCtx.Result(2*time.Second, 2*time.Millisecond)
require.Equal(t, expected, res)
}

Expand Down Expand Up @@ -177,6 +179,7 @@ func TestResult_Merge(t *testing.T) {
},
Summary: Summary{
ExecTime: 2 * time.Second.Seconds(),
EnqueueTime: 2 * time.Millisecond.Seconds(),
BytesProcessedPerSecond: int64(42),
LinesProcessedPerSecond: int64(50),
TotalBytesProcessed: int64(84),
Expand Down Expand Up @@ -223,6 +226,7 @@ func TestResult_Merge(t *testing.T) {
},
Summary: Summary{
ExecTime: 2 * 2 * time.Second.Seconds(),
EnqueueTime: 2 * 2 * time.Millisecond.Seconds(),
BytesProcessedPerSecond: int64(42), // 2 requests at the same pace should give the same bytes/lines per sec
LinesProcessedPerSecond: int64(50),
TotalBytesProcessed: 2 * int64(84),
Expand All @@ -234,10 +238,10 @@ func TestResult_Merge(t *testing.T) {
func TestReset(t *testing.T) {
statsCtx, ctx := NewContext(context.Background())
fakeIngesterQuery(ctx)
res := statsCtx.Result(2 * time.Second)
res := statsCtx.Result(2*time.Second, 2*time.Millisecond)
require.NotEmpty(t, res)
statsCtx.Reset()
res = statsCtx.Result(0)
res = statsCtx.Result(0, 0)
require.Empty(t, res)
}

Expand Down
Loading

0 comments on commit 3b11ac8

Please sign in to comment.