From 3b11ac8f487646d6e9cc6f01a019551611dd3428 Mon Sep 17 00:00:00 2001 From: Susana Ferreira Date: Thu, 30 Dec 2021 15:53:48 +0000 Subject: [PATCH] Add query enqueue time to metrics.go --- CHANGELOG.md | 1 + docs/sources/api/_index.md | 1 + pkg/chunkenc/memchunk_test.go | 4 +- pkg/iter/entry_iterator_test.go | 2 +- pkg/logql/engine.go | 5 +- pkg/logql/engine_test.go | 6 +- pkg/logql/metrics.go | 1 + pkg/logql/metrics_test.go | 3 +- pkg/logqlmodel/stats/context.go | 13 +- pkg/logqlmodel/stats/context_test.go | 12 +- pkg/logqlmodel/stats/stats.pb.go | 125 +++++++++++++------- pkg/logqlmodel/stats/stats.proto | 2 + pkg/loki/modules.go | 10 +- pkg/querier/queryrange/codec_test.go | 18 +-- pkg/querier/queryrange/downstreamer_test.go | 10 +- pkg/querier/queryrange/prometheus_test.go | 1 + pkg/querier/queryrange/stats.go | 5 +- pkg/scheduler/scheduler.go | 13 +- pkg/storage/batch_test.go | 2 +- pkg/util/httpreq/tags.go | 21 ++++ pkg/util/httpreq/tags_test.go | 42 +++++++ pkg/util/marshal/legacy/marshal_test.go | 1 + pkg/util/marshal/marshal_test.go | 3 + 23 files changed, 220 insertions(+), 81 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 13fc23103d350..e40748d48f50e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## Main +* [4949](https://github.com/grafana/loki/pull/4949) **ssncferreira**: Add query `enqueueTime` metric to statistics and metrics.go * [4993](https://github.com/grafana/loki/pull/4926) **thejosephstevens**: Fix parent of wal and wal_cleaner in loki ruler config docs * [4933](https://github.com/grafana/loki/pull/4933) **jeschkies**: Support matchers in series label values query. * [4926](https://github.com/grafana/loki/pull/4926) **thejosephstevens**: Fix comment in Loki module loading for accuracy diff --git a/docs/sources/api/_index.md b/docs/sources/api/_index.md index c4346df3a6fdf..631d8f0a4082a 100644 --- a/docs/sources/api/_index.md +++ b/docs/sources/api/_index.md @@ -927,6 +927,7 @@ The example belows show all possible statistics returned with their respective d }, "summary": { "bytesProcessedPerSecond": 0, // Total of bytes processed per second + "enqueueTime": 0, // Total enqueue time in seconds (float) "execTime": 0, // Total execution time in seconds (float) "linesProcessedPerSecond": 0, // Total lines processed per second "totalBytesProcessed":0, // Total amount of bytes processed overall for this request diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index bc99644c14509..af4cf2479064d 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -567,7 +567,7 @@ func TestChunkStats(t *testing.T) { t.Fatal(err) } // test on a chunk filling up - s := statsCtx.Result(time.Since(first)) + s := statsCtx.Result(time.Since(first), 0) require.Equal(t, int64(expectedSize), s.Summary.TotalBytesProcessed) require.Equal(t, int64(inserted), s.Summary.TotalLinesProcessed) @@ -594,7 +594,7 @@ func TestChunkStats(t *testing.T) { if err := it.Close(); err != nil { t.Fatal(err) } - s = statsCtx.Result(time.Since(first)) + s = statsCtx.Result(time.Since(first), 0) require.Equal(t, int64(expectedSize), s.Summary.TotalBytesProcessed) require.Equal(t, int64(inserted), s.Summary.TotalLinesProcessed) diff --git a/pkg/iter/entry_iterator_test.go b/pkg/iter/entry_iterator_test.go index c381453549385..459c39d6b3e73 100644 --- a/pkg/iter/entry_iterator_test.go +++ b/pkg/iter/entry_iterator_test.go @@ -550,7 +550,7 @@ func Test_DuplicateCount(t *testing.T) { defer it.Close() for it.Next() { } - require.Equal(t, test.expectedDuplicates, stats.FromContext(ctx).Result(0).TotalDuplicates()) + require.Equal(t, test.expectedDuplicates, stats.FromContext(ctx).Result(0, 0).TotalDuplicates()) }) } } diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index dec8564b3f487..c34f4b0875afd 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -23,6 +23,7 @@ import ( "github.com/grafana/loki/pkg/logqlmodel" "github.com/grafana/loki/pkg/logqlmodel/stats" "github.com/grafana/loki/pkg/util" + "github.com/grafana/loki/pkg/util/httpreq" ) var ( @@ -119,7 +120,9 @@ func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) { data, err := q.Eval(ctx) - statResult := statsCtx.Result(time.Since(start)) + enqueueTime, _ := ctx.Value(httpreq.QueryEnqueueTimeHTTPHeader).(time.Duration) + + statResult := statsCtx.Result(time.Since(start), enqueueTime) statResult.Log(level.Debug(log)) status := "200" diff --git a/pkg/logql/engine_test.go b/pkg/logql/engine_test.go index ce0f8d6e44cfc..973175a1abaee 100644 --- a/pkg/logql/engine_test.go +++ b/pkg/logql/engine_test.go @@ -22,6 +22,7 @@ import ( "github.com/grafana/loki/pkg/logqlmodel" "github.com/grafana/loki/pkg/logqlmodel/stats" "github.com/grafana/loki/pkg/util" + "github.com/grafana/loki/pkg/util/httpreq" ) var ( @@ -2010,6 +2011,7 @@ func (statsQuerier) SelectSamples(ctx context.Context, p SelectSampleParams) (it func TestEngine_Stats(t *testing.T) { eng := NewEngine(EngineOpts{}, &statsQuerier{}, NoLimits) + enqueueTime := 2 * time.Millisecond q := eng.Query(LiteralParams{ qs: `{foo="bar"}`, start: time.Now(), @@ -2017,9 +2019,11 @@ func TestEngine_Stats(t *testing.T) { direction: logproto.BACKWARD, limit: 1000, }) - r, err := q.Exec(user.InjectOrgID(context.Background(), "fake")) + ctx := context.WithValue(context.Background(), httpreq.QueryEnqueueTimeHTTPHeader, enqueueTime) + r, err := q.Exec(user.InjectOrgID(ctx, "fake")) require.NoError(t, err) require.Equal(t, int64(1), r.Statistics.TotalDecompressedBytes()) + require.Equal(t, enqueueTime.Seconds(), r.Statistics.Summary.EnqueueTime) } type errorIteratorQuerier struct { diff --git a/pkg/logql/metrics.go b/pkg/logql/metrics.go index 706784cd6c9d5..bb51635a863e0 100644 --- a/pkg/logql/metrics.go +++ b/pkg/logql/metrics.go @@ -106,6 +106,7 @@ func RecordMetrics(ctx context.Context, p Params, status string, stats stats.Res "returned_lines", returnedLines, "throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1), "total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1), + "enqueue_time", time.Duration(int64(stats.Summary.EnqueueTime * float64(time.Second))), }...) logValues = append(logValues, tagsToKeyValues(queryTags)...) diff --git a/pkg/logql/metrics_test.go b/pkg/logql/metrics_test.go index 4550ae22ced90..db501ef1730bc 100644 --- a/pkg/logql/metrics_test.go +++ b/pkg/logql/metrics_test.go @@ -76,13 +76,14 @@ func TestLogSlowQuery(t *testing.T) { }, "200", stats.Result{ Summary: stats.Summary{ BytesProcessedPerSecond: 100000, + EnqueueTime: 0.2, ExecTime: 25.25, TotalBytesProcessed: 100000, }, }, logqlmodel.Streams{logproto.Stream{Entries: make([]logproto.Entry, 10)}}) require.Equal(t, fmt.Sprintf( - "level=info org_id=foo traceID=%s latency=slow query=\"{foo=\\\"bar\\\"} |= \\\"buzz\\\"\" query_type=filter range_type=range length=1h0m0s step=1m0s duration=25.25s status=200 limit=1000 returned_lines=10 throughput=100kB total_bytes=100kB source=logvolhist feature=beta\n", + "level=info org_id=foo traceID=%s latency=slow query=\"{foo=\\\"bar\\\"} |= \\\"buzz\\\"\" query_type=filter range_type=range length=1h0m0s step=1m0s duration=25.25s status=200 limit=1000 returned_lines=10 throughput=100kB total_bytes=100kB enqueue_time=200ms source=logvolhist feature=beta\n", sp.Context().(jaeger.SpanContext).SpanID().String(), ), buf.String()) diff --git a/pkg/logqlmodel/stats/context.go b/pkg/logqlmodel/stats/context.go index 024de026fb5a5..18144e9ac965b 100644 --- a/pkg/logqlmodel/stats/context.go +++ b/pkg/logqlmodel/stats/context.go @@ -91,7 +91,7 @@ func (c *Context) Reset() { } // Result calculates the summary based on store and ingester data. -func (c *Context) Result(execTime time.Duration) Result { +func (c *Context) Result(execTime time.Duration, enqueueTime time.Duration) Result { r := c.result r.Merge(Result{ @@ -101,7 +101,7 @@ func (c *Context) Result(execTime time.Duration) Result { Ingester: c.ingester, }) - r.ComputeSummary(execTime) + r.ComputeSummary(execTime, enqueueTime) return r } @@ -125,7 +125,7 @@ func JoinIngesters(ctx context.Context, inc Ingester) { } // ComputeSummary compute the summary of the statistics. -func (r *Result) ComputeSummary(execTime time.Duration) { +func (r *Result) ComputeSummary(execTime time.Duration, enqueueTime time.Duration) { r.Summary.TotalBytesProcessed = r.Querier.Store.Chunk.DecompressedBytes + r.Querier.Store.Chunk.HeadChunkBytes + r.Ingester.Store.Chunk.DecompressedBytes + r.Ingester.Store.Chunk.HeadChunkBytes r.Summary.TotalLinesProcessed = r.Querier.Store.Chunk.DecompressedLines + r.Querier.Store.Chunk.HeadChunkLines + @@ -139,6 +139,9 @@ func (r *Result) ComputeSummary(execTime time.Duration) { int64(float64(r.Summary.TotalLinesProcessed) / execTime.Seconds()) } + if enqueueTime != 0 { + r.Summary.EnqueueTime = enqueueTime.Seconds() + } } func (s *Store) Merge(m Store) { @@ -168,7 +171,8 @@ func (i *Ingester) Merge(m Ingester) { func (r *Result) Merge(m Result) { r.Querier.Merge(m.Querier) r.Ingester.Merge(m.Ingester) - r.ComputeSummary(time.Duration(int64((r.Summary.ExecTime + m.Summary.ExecTime) * float64(time.Second)))) + r.ComputeSummary(time.Duration(int64((r.Summary.ExecTime+m.Summary.ExecTime)*float64(time.Second))), + time.Duration(int64((r.Summary.EnqueueTime+m.Summary.EnqueueTime)*float64(time.Second)))) } func (r Result) ChunksDownloadTime() time.Duration { @@ -281,5 +285,6 @@ func (s Summary) Log(log log.Logger) { "Summary.TotalBytesProcessed", humanize.Bytes(uint64(s.TotalBytesProcessed)), "Summary.TotalLinesProcessed", s.TotalLinesProcessed, "Summary.ExecTime", time.Duration(int64(s.ExecTime*float64(time.Second))), + "Summary.EnqueueTime", time.Duration(int64(s.EnqueueTime*float64(time.Second))), ) } diff --git a/pkg/logqlmodel/stats/context_test.go b/pkg/logqlmodel/stats/context_test.go index 47552b4e90585..422a2d0ac1d28 100644 --- a/pkg/logqlmodel/stats/context_test.go +++ b/pkg/logqlmodel/stats/context_test.go @@ -25,7 +25,7 @@ func TestResult(t *testing.T) { fakeIngesterQuery(ctx) fakeIngesterQuery(ctx) - res := stats.Result(2 * time.Second) + res := stats.Result(2*time.Second, 2*time.Millisecond) res.Log(util_log.Logger) expected := Result{ Ingester: Ingester{ @@ -61,6 +61,7 @@ func TestResult(t *testing.T) { }, Summary: Summary{ ExecTime: 2 * time.Second.Seconds(), + EnqueueTime: 2 * time.Millisecond.Seconds(), BytesProcessedPerSecond: int64(42), LinesProcessedPerSecond: int64(50), TotalBytesProcessed: int64(84), @@ -106,6 +107,7 @@ func TestSnapshot_JoinResults(t *testing.T) { }, Summary: Summary{ ExecTime: 2 * time.Second.Seconds(), + EnqueueTime: 2 * time.Millisecond.Seconds(), BytesProcessedPerSecond: int64(42), LinesProcessedPerSecond: int64(50), TotalBytesProcessed: int64(84), @@ -114,7 +116,7 @@ func TestSnapshot_JoinResults(t *testing.T) { } JoinResults(ctx, expected) - res := statsCtx.Result(2 * time.Second) + res := statsCtx.Result(2*time.Second, 2*time.Millisecond) require.Equal(t, expected, res) } @@ -177,6 +179,7 @@ func TestResult_Merge(t *testing.T) { }, Summary: Summary{ ExecTime: 2 * time.Second.Seconds(), + EnqueueTime: 2 * time.Millisecond.Seconds(), BytesProcessedPerSecond: int64(42), LinesProcessedPerSecond: int64(50), TotalBytesProcessed: int64(84), @@ -223,6 +226,7 @@ func TestResult_Merge(t *testing.T) { }, Summary: Summary{ ExecTime: 2 * 2 * time.Second.Seconds(), + EnqueueTime: 2 * 2 * time.Millisecond.Seconds(), BytesProcessedPerSecond: int64(42), // 2 requests at the same pace should give the same bytes/lines per sec LinesProcessedPerSecond: int64(50), TotalBytesProcessed: 2 * int64(84), @@ -234,10 +238,10 @@ func TestResult_Merge(t *testing.T) { func TestReset(t *testing.T) { statsCtx, ctx := NewContext(context.Background()) fakeIngesterQuery(ctx) - res := statsCtx.Result(2 * time.Second) + res := statsCtx.Result(2*time.Second, 2*time.Millisecond) require.NotEmpty(t, res) statsCtx.Reset() - res = statsCtx.Result(0) + res = statsCtx.Result(0, 0) require.Empty(t, res) } diff --git a/pkg/logqlmodel/stats/stats.pb.go b/pkg/logqlmodel/stats/stats.pb.go index 7538f6053b489..2e086fcbd82bf 100644 --- a/pkg/logqlmodel/stats/stats.pb.go +++ b/pkg/logqlmodel/stats/stats.pb.go @@ -98,6 +98,8 @@ type Summary struct { TotalLinesProcessed int64 `protobuf:"varint,4,opt,name=totalLinesProcessed,proto3" json:"totalLinesProcessed"` // Execution time in seconds. ExecTime float64 `protobuf:"fixed64,5,opt,name=execTime,proto3" json:"execTime"` + // Enqueue time in seconds. + EnqueueTime float64 `protobuf:"fixed64,6,opt,name=enqueueTime,proto3" json:"enqueueTime"` } func (m *Summary) Reset() { *m = Summary{} } @@ -167,6 +169,13 @@ func (m *Summary) GetExecTime() float64 { return 0 } +func (m *Summary) GetEnqueueTime() float64 { + if m != nil { + return m.EnqueueTime + } + return 0 +} + type Querier struct { Store Store `protobuf:"bytes,1,opt,name=store,proto3" json:"store"` } @@ -460,51 +469,52 @@ func init() { func init() { proto.RegisterFile("pkg/logqlmodel/stats/stats.proto", fileDescriptor_6cdfe5d2aea33ebb) } var fileDescriptor_6cdfe5d2aea33ebb = []byte{ - // 693 bytes of a gzipped FileDescriptorProto + // 714 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x55, 0xbf, 0x6f, 0xd3, 0x40, - 0x14, 0xb6, 0x13, 0xdc, 0x84, 0xa3, 0xb4, 0xe5, 0xaa, 0xd2, 0x00, 0x92, 0x5d, 0x65, 0xea, 0x00, - 0x8d, 0xf8, 0xb1, 0x80, 0xe8, 0xe2, 0x56, 0x48, 0x95, 0x40, 0x94, 0x57, 0x58, 0xd8, 0x1c, 0xe7, - 0x9a, 0x58, 0x75, 0x7c, 0xa9, 0x7f, 0x08, 0xba, 0xb1, 0x31, 0xc2, 0x9f, 0xc1, 0xc2, 0x9f, 0xc0, - 0xde, 0xb1, 0x63, 0x27, 0x8b, 0xba, 0x0b, 0xf2, 0xd4, 0x8d, 0x15, 0xf9, 0x9d, 0x63, 0xd7, 0x17, - 0x47, 0x62, 0xb1, 0xef, 0x7d, 0xdf, 0xfb, 0xde, 0x3b, 0xbf, 0xef, 0xac, 0x23, 0x1b, 0x93, 0xa3, - 0x61, 0xcf, 0xe5, 0xc3, 0x63, 0x77, 0xcc, 0x07, 0xcc, 0xed, 0x05, 0xa1, 0x15, 0x06, 0xe2, 0xb9, - 0x35, 0xf1, 0x79, 0xc8, 0xa9, 0x86, 0xc1, 0xfd, 0x47, 0x43, 0x27, 0x1c, 0x45, 0xfd, 0x2d, 0x9b, - 0x8f, 0x7b, 0x43, 0x3e, 0xe4, 0x3d, 0x64, 0xfb, 0xd1, 0x21, 0x46, 0x18, 0xe0, 0x4a, 0xa8, 0xba, - 0xbf, 0x54, 0xb2, 0x00, 0x2c, 0x88, 0xdc, 0x90, 0x3e, 0x27, 0xad, 0x20, 0x1a, 0x8f, 0x2d, 0xff, - 0xa4, 0xa3, 0x6e, 0xa8, 0x9b, 0xb7, 0x9e, 0x2c, 0x6d, 0x89, 0xfa, 0x07, 0x02, 0x35, 0x97, 0x4f, - 0x63, 0x43, 0x49, 0x63, 0x63, 0x9a, 0x06, 0xd3, 0x45, 0x26, 0x3d, 0x8e, 0x98, 0xef, 0x30, 0xbf, - 0xd3, 0xa8, 0x48, 0xdf, 0x09, 0xb4, 0x94, 0xe6, 0x69, 0x30, 0x5d, 0xd0, 0x6d, 0xd2, 0x76, 0xbc, - 0x21, 0x0b, 0x42, 0xe6, 0x77, 0x9a, 0xa8, 0x5d, 0xce, 0xb5, 0x7b, 0x39, 0x6c, 0xae, 0xe4, 0xe2, - 0x22, 0x11, 0x8a, 0x55, 0xf7, 0x6f, 0x83, 0xb4, 0xf2, 0xfd, 0xd1, 0x0f, 0x64, 0xbd, 0x7f, 0x12, - 0xb2, 0x60, 0xdf, 0xe7, 0x36, 0x0b, 0x02, 0x36, 0xd8, 0x67, 0xfe, 0x01, 0xb3, 0xb9, 0x37, 0xc0, - 0x0f, 0x6a, 0x9a, 0x0f, 0xd2, 0xd8, 0x98, 0x97, 0x02, 0xf3, 0x88, 0xac, 0xac, 0xeb, 0x78, 0xb5, - 0x65, 0x1b, 0x65, 0xd9, 0x39, 0x29, 0x30, 0x8f, 0xa0, 0x7b, 0x64, 0x35, 0xe4, 0xa1, 0xe5, 0x9a, - 0x95, 0xb6, 0x38, 0x83, 0xa6, 0xb9, 0x9e, 0xc6, 0x46, 0x1d, 0x0d, 0x75, 0x60, 0x51, 0xea, 0x75, - 0xa5, 0x55, 0xe7, 0x86, 0x54, 0xaa, 0x4a, 0x43, 0x1d, 0x48, 0x37, 0x49, 0x9b, 0x7d, 0x66, 0xf6, - 0x7b, 0x67, 0xcc, 0x3a, 0xda, 0x86, 0xba, 0xa9, 0x9a, 0x8b, 0xd9, 0xe4, 0xa7, 0x18, 0x14, 0xab, - 0xee, 0x4b, 0xd2, 0xca, 0xdd, 0xa5, 0x8f, 0x89, 0x16, 0x84, 0xdc, 0x67, 0xf9, 0xb9, 0x59, 0x9c, - 0x9e, 0x9b, 0x0c, 0x33, 0x6f, 0xe7, 0xee, 0x89, 0x14, 0x10, 0xaf, 0xee, 0xcf, 0x06, 0x69, 0x4f, - 0x0d, 0xa6, 0xcf, 0xc8, 0x22, 0xee, 0x05, 0x98, 0x65, 0x8f, 0x98, 0x70, 0x4b, 0x33, 0x57, 0xd2, - 0xd8, 0xa8, 0xe0, 0x50, 0x89, 0xe8, 0x2b, 0x42, 0x31, 0xde, 0x19, 0x45, 0xde, 0x51, 0xf0, 0xc6, - 0x0a, 0x51, 0x2b, 0x2c, 0xb9, 0x9b, 0xc6, 0x46, 0x0d, 0x0b, 0x35, 0x58, 0xd1, 0xdd, 0xc4, 0x38, - 0xc8, 0x1d, 0x28, 0xbb, 0xe7, 0x38, 0x54, 0x22, 0xfa, 0x82, 0x2c, 0x95, 0xf3, 0x3b, 0x60, 0x5e, - 0x98, 0x8f, 0x9b, 0xa6, 0xb1, 0x21, 0x31, 0x20, 0xc5, 0xe5, 0xbc, 0xb4, 0xff, 0x9e, 0xd7, 0xb7, - 0x06, 0xd1, 0x90, 0x2f, 0x1a, 0x8b, 0x8f, 0x00, 0x76, 0x98, 0x1f, 0xee, 0xb2, 0x71, 0xc1, 0x80, - 0x14, 0xd3, 0xb7, 0x64, 0xed, 0x1a, 0xb2, 0xcb, 0x3f, 0x79, 0x2e, 0xb7, 0x06, 0xc5, 0xd4, 0xee, - 0xa5, 0xb1, 0x51, 0x9f, 0x00, 0xf5, 0x70, 0xe6, 0x81, 0x5d, 0xc1, 0xf0, 0xe0, 0x34, 0x4b, 0x0f, - 0x66, 0x59, 0xa8, 0xc1, 0xb2, 0x89, 0x20, 0x8a, 0x43, 0x2c, 0x27, 0x82, 0xfd, 0xca, 0x89, 0x60, - 0x0a, 0x88, 0x57, 0xf7, 0x6b, 0x93, 0x68, 0xc8, 0x67, 0x13, 0x19, 0x31, 0x6b, 0x20, 0x92, 0xb3, - 0x3f, 0xe3, 0xba, 0x15, 0x55, 0x06, 0xa4, 0xb8, 0xa2, 0x45, 0x83, 0xd0, 0x13, 0x59, 0x8b, 0x0c, - 0x48, 0x31, 0xdd, 0x21, 0x77, 0x06, 0xcc, 0xe6, 0xe3, 0x89, 0x8f, 0xff, 0x8e, 0x68, 0xbd, 0x80, - 0xf2, 0xb5, 0x34, 0x36, 0x66, 0x49, 0x98, 0x85, 0xe4, 0x22, 0x62, 0x0f, 0xad, 0xfa, 0x22, 0x62, - 0x1b, 0xb3, 0x10, 0xdd, 0x26, 0xcb, 0xf2, 0x3e, 0xda, 0x58, 0x62, 0x35, 0x8d, 0x0d, 0x99, 0x02, - 0x19, 0xc8, 0xe4, 0x68, 0xef, 0x6e, 0x34, 0x71, 0x1d, 0xdb, 0xca, 0xe4, 0x37, 0x4b, 0xb9, 0x44, - 0x81, 0x0c, 0x98, 0xfd, 0xb3, 0x0b, 0x5d, 0x39, 0xbf, 0xd0, 0x95, 0xab, 0x0b, 0x5d, 0xfd, 0x92, - 0xe8, 0xea, 0x8f, 0x44, 0x57, 0x4f, 0x13, 0x5d, 0x3d, 0x4b, 0x74, 0xf5, 0x77, 0xa2, 0xab, 0x7f, - 0x12, 0x5d, 0xb9, 0x4a, 0x74, 0xf5, 0xfb, 0xa5, 0xae, 0x9c, 0x5d, 0xea, 0xca, 0xf9, 0xa5, 0xae, - 0x7c, 0x7c, 0x78, 0xfd, 0xa2, 0xf2, 0xad, 0x43, 0xcb, 0xb3, 0x7a, 0x2e, 0x3f, 0x72, 0x7a, 0x75, - 0x37, 0x5d, 0x7f, 0x01, 0xaf, 0xab, 0xa7, 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, 0x2e, 0xbd, 0xac, - 0xdd, 0x08, 0x07, 0x00, 0x00, + 0x14, 0xb6, 0x13, 0x9c, 0x84, 0x6b, 0x69, 0xca, 0x55, 0xa5, 0x01, 0x24, 0xbb, 0xca, 0xd4, 0x01, + 0x1a, 0x15, 0x58, 0x40, 0x74, 0x71, 0x2b, 0xa4, 0x4a, 0x20, 0xca, 0x2b, 0x2c, 0x6c, 0x8e, 0x73, + 0x4d, 0xac, 0x3a, 0xbe, 0xd4, 0x3f, 0x04, 0xdd, 0xd8, 0x18, 0xe1, 0xcf, 0x40, 0x48, 0xfc, 0x09, + 0xec, 0x1d, 0x3b, 0x76, 0xb2, 0xa8, 0xbb, 0x20, 0x4f, 0xfd, 0x13, 0x90, 0xdf, 0x39, 0x76, 0xec, + 0x38, 0x12, 0x4b, 0x72, 0xdf, 0xf7, 0xbd, 0xef, 0xbd, 0xf3, 0x7b, 0x77, 0x3a, 0xb2, 0x39, 0x39, + 0x19, 0xf6, 0x6c, 0x3e, 0x3c, 0xb5, 0xc7, 0x7c, 0xc0, 0xec, 0x9e, 0xe7, 0x1b, 0xbe, 0x27, 0x7e, + 0xb7, 0x27, 0x2e, 0xf7, 0x39, 0x55, 0x10, 0x3c, 0x78, 0x3c, 0xb4, 0xfc, 0x51, 0xd0, 0xdf, 0x36, + 0xf9, 0xb8, 0x37, 0xe4, 0x43, 0xde, 0x43, 0xb5, 0x1f, 0x1c, 0x23, 0x42, 0x80, 0x2b, 0xe1, 0xea, + 0xfe, 0x96, 0x49, 0x03, 0x98, 0x17, 0xd8, 0x3e, 0x7d, 0x4e, 0x9a, 0x5e, 0x30, 0x1e, 0x1b, 0xee, + 0x59, 0x47, 0xde, 0x94, 0xb7, 0x96, 0x9e, 0xac, 0x6c, 0x8b, 0xfc, 0x47, 0x82, 0xd5, 0xdb, 0xe7, + 0xa1, 0x26, 0xc5, 0xa1, 0x36, 0x0d, 0x83, 0xe9, 0x22, 0xb1, 0x9e, 0x06, 0xcc, 0xb5, 0x98, 0xdb, + 0xa9, 0x15, 0xac, 0xef, 0x04, 0x9b, 0x5b, 0xd3, 0x30, 0x98, 0x2e, 0xe8, 0x2e, 0x69, 0x59, 0xce, + 0x90, 0x79, 0x3e, 0x73, 0x3b, 0x75, 0xf4, 0xb6, 0x53, 0xef, 0x41, 0x4a, 0xeb, 0xab, 0xa9, 0x39, + 0x0b, 0x84, 0x6c, 0xd5, 0xfd, 0x59, 0x27, 0xcd, 0x74, 0x7f, 0xf4, 0x03, 0xd9, 0xe8, 0x9f, 0xf9, + 0xcc, 0x3b, 0x74, 0xb9, 0xc9, 0x3c, 0x8f, 0x0d, 0x0e, 0x99, 0x7b, 0xc4, 0x4c, 0xee, 0x0c, 0xf0, + 0x83, 0xea, 0xfa, 0xc3, 0x38, 0xd4, 0x16, 0x85, 0xc0, 0x22, 0x21, 0x49, 0x6b, 0x5b, 0x4e, 0x65, + 0xda, 0x5a, 0x9e, 0x76, 0x41, 0x08, 0x2c, 0x12, 0xe8, 0x01, 0x59, 0xf3, 0xb9, 0x6f, 0xd8, 0x7a, + 0xa1, 0x2c, 0xf6, 0xa0, 0xae, 0x6f, 0xc4, 0xa1, 0x56, 0x25, 0x43, 0x15, 0x99, 0xa5, 0x7a, 0x5d, + 0x28, 0xd5, 0xb9, 0x55, 0x4a, 0x55, 0x94, 0xa1, 0x8a, 0xa4, 0x5b, 0xa4, 0xc5, 0x3e, 0x33, 0xf3, + 0xbd, 0x35, 0x66, 0x1d, 0x65, 0x53, 0xde, 0x92, 0xf5, 0xe5, 0xa4, 0xf3, 0x53, 0x0e, 0xb2, 0x15, + 0xdd, 0x21, 0x4b, 0xcc, 0x39, 0x0d, 0x58, 0xc0, 0x30, 0xb8, 0x81, 0xc1, 0xed, 0x38, 0xd4, 0x66, + 0x69, 0x98, 0x05, 0xdd, 0x97, 0xa4, 0x99, 0x1e, 0x08, 0xba, 0x43, 0x14, 0xcf, 0xe7, 0x2e, 0x4b, + 0x8f, 0xda, 0xf2, 0xf4, 0xa8, 0x25, 0x9c, 0x7e, 0x27, 0x1d, 0xb8, 0x08, 0x01, 0xf1, 0xd7, 0xfd, + 0x55, 0x23, 0xad, 0xe9, 0x99, 0xa0, 0xcf, 0xc8, 0x32, 0x6e, 0x1f, 0x98, 0x61, 0x8e, 0x98, 0x18, + 0xb0, 0xa2, 0xaf, 0xc6, 0xa1, 0x56, 0xe0, 0xa1, 0x80, 0xe8, 0x2b, 0x42, 0x11, 0xef, 0x8d, 0x02, + 0xe7, 0xc4, 0x7b, 0x63, 0xf8, 0xe8, 0x15, 0x53, 0xbc, 0x17, 0x87, 0x5a, 0x85, 0x0a, 0x15, 0x5c, + 0x56, 0x5d, 0x47, 0xec, 0xa5, 0x43, 0xcb, 0xab, 0xa7, 0x3c, 0x14, 0x10, 0x7d, 0x41, 0x56, 0xf2, + 0x96, 0x1f, 0x31, 0xc7, 0x4f, 0x27, 0x44, 0xe3, 0x50, 0x2b, 0x29, 0x50, 0xc2, 0x79, 0xbf, 0x94, + 0xff, 0xee, 0xd7, 0xb7, 0x1a, 0x51, 0x50, 0xcf, 0x0a, 0x8b, 0x8f, 0x00, 0x76, 0x9c, 0xde, 0x87, + 0xbc, 0x70, 0xa6, 0x40, 0x09, 0xd3, 0xb7, 0x64, 0x7d, 0x86, 0xd9, 0xe7, 0x9f, 0x1c, 0x9b, 0x1b, + 0x83, 0xac, 0x6b, 0xf7, 0xe3, 0x50, 0xab, 0x0e, 0x80, 0x6a, 0x3a, 0x99, 0x81, 0x59, 0xe0, 0xf0, + 0xf8, 0xd4, 0xf3, 0x19, 0xcc, 0xab, 0x50, 0xc1, 0x25, 0x1d, 0x41, 0x16, 0x9b, 0x98, 0x77, 0x04, + 0xeb, 0xe5, 0x1d, 0xc1, 0x10, 0x10, 0x7f, 0xdd, 0xaf, 0x75, 0xa2, 0xa0, 0x9e, 0x74, 0x64, 0xc4, + 0x8c, 0x81, 0x08, 0x4e, 0x2e, 0xd3, 0xec, 0x28, 0x8a, 0x0a, 0x94, 0x70, 0xc1, 0x8b, 0x03, 0xc2, + 0x99, 0x94, 0xbd, 0xa8, 0x40, 0x09, 0xd3, 0x3d, 0x72, 0x77, 0xc0, 0x4c, 0x3e, 0x9e, 0xb8, 0x78, + 0xdd, 0x44, 0xe9, 0x06, 0xda, 0xd7, 0xe3, 0x50, 0x9b, 0x17, 0x61, 0x9e, 0x2a, 0x27, 0x11, 0x7b, + 0x68, 0x56, 0x27, 0x11, 0xdb, 0x98, 0xa7, 0xe8, 0x2e, 0x69, 0x97, 0xf7, 0xd1, 0xc2, 0x14, 0x6b, + 0x71, 0xa8, 0x95, 0x25, 0x28, 0x13, 0x89, 0x1d, 0xc7, 0xbb, 0x1f, 0x4c, 0x6c, 0xcb, 0x34, 0x12, + 0xfb, 0xed, 0xdc, 0x5e, 0x92, 0xa0, 0x4c, 0xe8, 0xfd, 0x8b, 0x2b, 0x55, 0xba, 0xbc, 0x52, 0xa5, + 0x9b, 0x2b, 0x55, 0xfe, 0x12, 0xa9, 0xf2, 0x8f, 0x48, 0x95, 0xcf, 0x23, 0x55, 0xbe, 0x88, 0x54, + 0xf9, 0x4f, 0xa4, 0xca, 0x7f, 0x23, 0x55, 0xba, 0x89, 0x54, 0xf9, 0xfb, 0xb5, 0x2a, 0x5d, 0x5c, + 0xab, 0xd2, 0xe5, 0xb5, 0x2a, 0x7d, 0x7c, 0x34, 0xfb, 0xb6, 0xb9, 0xc6, 0xb1, 0xe1, 0x18, 0x3d, + 0x9b, 0x9f, 0x58, 0xbd, 0xaa, 0xc7, 0xb1, 0xdf, 0xc0, 0x17, 0xee, 0xe9, 0xbf, 0x00, 0x00, 0x00, + 0xff, 0xff, 0xc5, 0x7a, 0x6e, 0xca, 0x3b, 0x07, 0x00, 0x00, } func (this *Result) Equal(that interface{}) bool { @@ -571,6 +581,9 @@ func (this *Summary) Equal(that interface{}) bool { if this.ExecTime != that1.ExecTime { return false } + if this.EnqueueTime != that1.EnqueueTime { + return false + } return true } func (this *Querier) Equal(that interface{}) bool { @@ -721,13 +734,14 @@ func (this *Summary) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 9) + s := make([]string, 0, 10) s = append(s, "&stats.Summary{") s = append(s, "BytesProcessedPerSecond: "+fmt.Sprintf("%#v", this.BytesProcessedPerSecond)+",\n") s = append(s, "LinesProcessedPerSecond: "+fmt.Sprintf("%#v", this.LinesProcessedPerSecond)+",\n") s = append(s, "TotalBytesProcessed: "+fmt.Sprintf("%#v", this.TotalBytesProcessed)+",\n") s = append(s, "TotalLinesProcessed: "+fmt.Sprintf("%#v", this.TotalLinesProcessed)+",\n") s = append(s, "ExecTime: "+fmt.Sprintf("%#v", this.ExecTime)+",\n") + s = append(s, "EnqueueTime: "+fmt.Sprintf("%#v", this.EnqueueTime)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -864,6 +878,12 @@ func (m *Summary) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.EnqueueTime != 0 { + i -= 8 + encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.EnqueueTime)))) + i-- + dAtA[i] = 0x31 + } if m.ExecTime != 0 { i -= 8 encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.ExecTime)))) @@ -1127,6 +1147,9 @@ func (m *Summary) Size() (n int) { if m.ExecTime != 0 { n += 9 } + if m.EnqueueTime != 0 { + n += 9 + } return n } @@ -1239,6 +1262,7 @@ func (this *Summary) String() string { `TotalBytesProcessed:` + fmt.Sprintf("%v", this.TotalBytesProcessed) + `,`, `TotalLinesProcessed:` + fmt.Sprintf("%v", this.TotalLinesProcessed) + `,`, `ExecTime:` + fmt.Sprintf("%v", this.ExecTime) + `,`, + `EnqueueTime:` + fmt.Sprintf("%v", this.EnqueueTime) + `,`, `}`, }, "") return s @@ -1571,6 +1595,17 @@ func (m *Summary) Unmarshal(dAtA []byte) error { v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:])) iNdEx += 8 m.ExecTime = float64(math.Float64frombits(v)) + case 6: + if wireType != 1 { + return fmt.Errorf("proto: wrong wireType = %d for field EnqueueTime", wireType) + } + var v uint64 + if (iNdEx + 8) > l { + return io.ErrUnexpectedEOF + } + v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:])) + iNdEx += 8 + m.EnqueueTime = float64(math.Float64frombits(v)) default: iNdEx = preIndex skippy, err := skipStats(dAtA[iNdEx:]) diff --git a/pkg/logqlmodel/stats/stats.proto b/pkg/logqlmodel/stats/stats.proto index 783433e4adcf0..a7a29950e1f74 100644 --- a/pkg/logqlmodel/stats/stats.proto +++ b/pkg/logqlmodel/stats/stats.proto @@ -28,6 +28,8 @@ message Summary { int64 totalLinesProcessed = 4 [(gogoproto.jsontag) = "totalLinesProcessed"]; // Execution time in seconds. double execTime = 5 [(gogoproto.jsontag) = "execTime"]; + // Enqueue time in seconds. + double enqueueTime = 6 [(gogoproto.jsontag) = "enqueueTime"]; } message Querier { diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index b7101f4796dfd..a219d19ddd71f 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -233,15 +233,19 @@ func (t *Loki) initQuerier() (services.Service, error) { SchedulerRing: scheduler.SafeReadRing(t.queryScheduler), } + httpMiddleware := middleware.Merge( + httpreq.ExtractQueryMetricsMiddleware(), + ) + queryHandlers := map[string]http.Handler{ - "/loki/api/v1/query_range": http.HandlerFunc(t.Querier.RangeQueryHandler), - "/loki/api/v1/query": http.HandlerFunc(t.Querier.InstantQueryHandler), + "/loki/api/v1/query_range": httpMiddleware.Wrap(http.HandlerFunc(t.Querier.RangeQueryHandler)), + "/loki/api/v1/query": httpMiddleware.Wrap(http.HandlerFunc(t.Querier.InstantQueryHandler)), "/loki/api/v1/label": http.HandlerFunc(t.Querier.LabelHandler), "/loki/api/v1/labels": http.HandlerFunc(t.Querier.LabelHandler), "/loki/api/v1/label/{name}/values": http.HandlerFunc(t.Querier.LabelHandler), "/loki/api/v1/series": http.HandlerFunc(t.Querier.SeriesHandler), - "/api/prom/query": http.HandlerFunc(t.Querier.LogQueryHandler), + "/api/prom/query": httpMiddleware.Wrap(http.HandlerFunc(t.Querier.LogQueryHandler)), "/api/prom/label": http.HandlerFunc(t.Querier.LabelHandler), "/api/prom/label/{name}/values": http.HandlerFunc(t.Querier.LabelHandler), "/api/prom/series": http.HandlerFunc(t.Querier.SeriesHandler), diff --git a/pkg/querier/queryrange/codec_test.go b/pkg/querier/queryrange/codec_test.go index a49db3f6b3cd4..8c5a8d07a70f3 100644 --- a/pkg/querier/queryrange/codec_test.go +++ b/pkg/querier/queryrange/codec_test.go @@ -903,10 +903,11 @@ var ( }, "summary": { "bytesProcessedPerSecond": 20, - "execTime": 21, - "linesProcessedPerSecond": 22, - "totalBytesProcessed": 23, - "totalLinesProcessed": 24 + "enqueueTime": 21, + "execTime": 22, + "linesProcessedPerSecond": 23, + "totalBytesProcessed": 24, + "totalLinesProcessed": 25 } },` matrixString = `{ @@ -1052,10 +1053,11 @@ var ( statsResult = stats.Result{ Summary: stats.Summary{ BytesProcessedPerSecond: 20, - ExecTime: 21, - LinesProcessedPerSecond: 22, - TotalBytesProcessed: 23, - TotalLinesProcessed: 24, + EnqueueTime: 21, + ExecTime: 22, + LinesProcessedPerSecond: 23, + TotalBytesProcessed: 24, + TotalLinesProcessed: 25, }, Querier: stats.Querier{ Store: stats.Store{ diff --git a/pkg/querier/queryrange/downstreamer_test.go b/pkg/querier/queryrange/downstreamer_test.go index c8387dd345969..6283dfbdc29ce 100644 --- a/pkg/querier/queryrange/downstreamer_test.go +++ b/pkg/querier/queryrange/downstreamer_test.go @@ -120,12 +120,12 @@ func TestResponseToResult(t *testing.T) { }}, }, Statistics: stats.Result{ - Summary: stats.Summary{ExecTime: 1}, + Summary: stats.Summary{EnqueueTime: 1, ExecTime: 2}, }, }, expected: logqlmodel.Result{ Statistics: stats.Result{ - Summary: stats.Summary{ExecTime: 1}, + Summary: stats.Summary{EnqueueTime: 1, ExecTime: 2}, }, Data: logqlmodel.Streams{{ Labels: `{foo="bar"}`, @@ -144,7 +144,7 @@ func TestResponseToResult(t *testing.T) { desc: "LokiPromResponse", input: &LokiPromResponse{ Statistics: stats.Result{ - Summary: stats.Summary{ExecTime: 1}, + Summary: stats.Summary{EnqueueTime: 1, ExecTime: 2}, }, Response: &queryrange.PrometheusResponse{ Data: queryrange.PrometheusData{ @@ -154,7 +154,7 @@ func TestResponseToResult(t *testing.T) { }, expected: logqlmodel.Result{ Statistics: stats.Result{ - Summary: stats.Summary{ExecTime: 1}, + Summary: stats.Summary{EnqueueTime: 1, ExecTime: 2}, }, Data: sampleStreamToMatrix(testSampleStreams()), }, @@ -310,7 +310,7 @@ func TestInstanceDownstream(t *testing.T) { }}, }, Statistics: stats.Result{ - Summary: stats.Summary{ExecTime: 1}, + Summary: stats.Summary{EnqueueTime: 1, ExecTime: 2}, }, } } diff --git a/pkg/querier/queryrange/prometheus_test.go b/pkg/querier/queryrange/prometheus_test.go index 827dd1de8932d..9c15d25902a80 100644 --- a/pkg/querier/queryrange/prometheus_test.go +++ b/pkg/querier/queryrange/prometheus_test.go @@ -49,6 +49,7 @@ var emptyStats = `"stats": { }, "summary": { "bytesProcessedPerSecond": 0, + "enqueueTime": 0, "execTime": 0, "linesProcessedPerSecond": 0, "totalBytesProcessed":0, diff --git a/pkg/querier/queryrange/stats.go b/pkg/querier/queryrange/stats.go index 6e15505216b44..4149e1c135b71 100644 --- a/pkg/querier/queryrange/stats.go +++ b/pkg/querier/queryrange/stats.go @@ -101,8 +101,9 @@ func StatsCollectorMiddleware() queryrange.Middleware { } } if statistics != nil { - // Re-calculate the summary then log and record metrics for the current query - statistics.ComputeSummary(time.Since(start)) + // Re-calculate the summary: the enqueueTime result is already merged so should not be updated + // Log and record metrics for the current query + statistics.ComputeSummary(time.Since(start), 0) statistics.Log(level.Debug(logger)) } ctxValue := ctx.Value(ctxKey) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index f30ef70713765..2582d2fcb7a5a 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -38,6 +38,7 @@ import ( lokiutil "github.com/grafana/loki/pkg/util" lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc" + lokihttpreq "github.com/grafana/loki/pkg/util/httpreq" ) var ( @@ -463,8 +464,8 @@ func (s *Scheduler) QuerierLoop(querier schedulerpb.SchedulerForQuerier_QuerierL r := req.(*schedulerRequest) - reqEnqueueTime := time.Since(r.enqueueTime).Seconds() - s.queueDuration.Observe(reqEnqueueTime) + reqEnqueueTime := time.Since(r.enqueueTime) + s.queueDuration.Observe(reqEnqueueTime.Seconds()) r.queueSpan.Finish() reqTenantIDs, err := tenant.TenantIDsFromOrgID(r.userID) @@ -476,7 +477,13 @@ func (s *Scheduler) QuerierLoop(querier schedulerpb.SchedulerForQuerier_QuerierL reqURL = r.request.Url } level.Info(s.log).Log("msg", "querier request dequeued", "tenant_ids", strings.Join(reqTenantIDs, ", "), - "querier_id", querierID, "query_id", r.queryID, "request", reqURL, "enqueue_time (ms)", reqEnqueueTime*1000) + "querier_id", querierID, "query_id", r.queryID, "request", reqURL, "enqueue_time", reqEnqueueTime) + + // Add HTTP header to the request containing the query enqueue time + r.request.Headers = append(r.request.Headers, &httpgrpc.Header{ + Key: string(lokihttpreq.QueryEnqueueTimeHTTPHeader), + Values: []string{reqEnqueueTime.String()}, + }) /* We want to dequeue the next unexpired request from the chosen tenant queue. diff --git a/pkg/storage/batch_test.go b/pkg/storage/batch_test.go index 98fea8fdf4d1d..9144ef0b5ee49 100644 --- a/pkg/storage/batch_test.go +++ b/pkg/storage/batch_test.go @@ -1665,7 +1665,7 @@ func Benchmark_store_OverlappingChunks(b *testing.B) { b.Fatal(err) } } - r := statsCtx.Result(time.Since(start)) + r := statsCtx.Result(time.Since(start), 0) b.Log("Total chunks:" + fmt.Sprintf("%d", r.TotalChunksRef())) b.Log("Total bytes decompressed:" + fmt.Sprintf("%d", r.TotalDecompressedBytes())) } diff --git a/pkg/util/httpreq/tags.go b/pkg/util/httpreq/tags.go index 222b37e51c18c..add4890861886 100644 --- a/pkg/util/httpreq/tags.go +++ b/pkg/util/httpreq/tags.go @@ -4,6 +4,7 @@ import ( "context" "net/http" "regexp" + "time" "github.com/weaveworks/common/middleware" ) @@ -16,6 +17,7 @@ var ( QueryTagsHTTPHeader ctxKey = "X-Query-Tags" safeQueryTags = regexp.MustCompile("[^a-zA-Z0-9-=, ]+") // only alpha-numeric, ' ', ',', '=' and `-` + QueryEnqueueTimeHTTPHeader ctxKey = "X-Query-Enqueue-Time" ) func ExtractQueryTagsMiddleware() middleware.Interface { @@ -33,3 +35,22 @@ func ExtractQueryTagsMiddleware() middleware.Interface { }) }) } + +func ExtractQueryMetricsMiddleware() middleware.Interface { + return middleware.Func(func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + ctx := req.Context() + + enqueueTimeHeaders := req.Header[string(QueryEnqueueTimeHTTPHeader)] + if len(enqueueTimeHeaders) > 0 && enqueueTimeHeaders[0] != "" { + enqueueTime, err := time.ParseDuration(enqueueTimeHeaders[0]) + if err == nil { + ctx = context.WithValue(ctx, QueryEnqueueTimeHTTPHeader, enqueueTime) + req = req.WithContext(ctx) + } + } + + next.ServeHTTP(w, req) + }) + }) +} diff --git a/pkg/util/httpreq/tags_test.go b/pkg/util/httpreq/tags_test.go index 09b651a289628..9653c25603e1d 100644 --- a/pkg/util/httpreq/tags_test.go +++ b/pkg/util/httpreq/tags_test.go @@ -4,6 +4,7 @@ import ( "net/http" "net/http/httptest" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -49,3 +50,44 @@ func TestQueryTags(t *testing.T) { }) } } + +func TestQueryMetrics(t *testing.T) { + for _, tc := range []struct { + desc string + in string + exp interface{} + error bool + }{ + { + desc: "valid time duration", + in: `2s`, + exp: 2 * time.Second, + }, + { + desc: "empty header", + in: ``, + exp: nil, + }, + { + desc: "invalid time duration", + in: `foo`, + exp: nil, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + req := httptest.NewRequest("GET", "http://testing.com", nil) + req.Header.Set(string(QueryEnqueueTimeHTTPHeader), tc.in) + + w := httptest.NewRecorder() + checked := false + mware := ExtractQueryMetricsMiddleware().Wrap(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + require.Equal(t, tc.exp, req.Context().Value(QueryEnqueueTimeHTTPHeader)) + checked = true + })) + + mware.ServeHTTP(w, req) + + assert.True(t, true, checked) + }) + } +} diff --git a/pkg/util/marshal/legacy/marshal_test.go b/pkg/util/marshal/legacy/marshal_test.go index 54dc23571d60c..744fa3b8359c9 100644 --- a/pkg/util/marshal/legacy/marshal_test.go +++ b/pkg/util/marshal/legacy/marshal_test.go @@ -80,6 +80,7 @@ var queryTests = []struct { }, "summary": { "bytesProcessedPerSecond": 0, + "enqueueTime": 0, "execTime": 0, "linesProcessedPerSecond": 0, "totalBytesProcessed":0, diff --git a/pkg/util/marshal/marshal_test.go b/pkg/util/marshal/marshal_test.go index 8e2932361276f..a9e5c1832d860 100644 --- a/pkg/util/marshal/marshal_test.go +++ b/pkg/util/marshal/marshal_test.go @@ -86,6 +86,7 @@ var queryTests = []struct { }, "summary": { "bytesProcessedPerSecond": 0, + "enqueueTime": 0, "execTime": 0, "linesProcessedPerSecond": 0, "totalBytesProcessed":0, @@ -193,6 +194,7 @@ var queryTests = []struct { }, "summary": { "bytesProcessedPerSecond": 0, + "enqueueTime": 0, "execTime": 0, "linesProcessedPerSecond": 0, "totalBytesProcessed":0, @@ -317,6 +319,7 @@ var queryTests = []struct { }, "summary": { "bytesProcessedPerSecond": 0, + "enqueueTime": 0, "execTime": 0, "linesProcessedPerSecond": 0, "totalBytesProcessed":0,