From 49167faabb1cf15ec2354546596ebac8785da865 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Tue, 18 Jan 2022 14:43:39 +0100 Subject: [PATCH 1/3] Reduce split by for large range vector Signed-off-by: Cyril Tovena --- pkg/querier/queryrange/split_by_interval.go | 60 ++++++++++++-- .../queryrange/split_by_interval_test.go | 83 ++++++++++++++++++- 2 files changed, 132 insertions(+), 11 deletions(-) diff --git a/pkg/querier/queryrange/split_by_interval.go b/pkg/querier/queryrange/split_by_interval.go index 3654c96d1861b..783d66ac09e0a 100644 --- a/pkg/querier/queryrange/split_by_interval.go +++ b/pkg/querier/queryrange/split_by_interval.go @@ -12,6 +12,7 @@ import ( "github.com/weaveworks/common/httpgrpc" "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" "github.com/grafana/loki/pkg/tenant" ) @@ -49,7 +50,7 @@ type splitByInterval struct { splitter Splitter } -type Splitter func(req queryrangebase.Request, interval time.Duration) []queryrangebase.Request +type Splitter func(req queryrangebase.Request, interval time.Duration) ([]queryrangebase.Request, error) // SplitByIntervalMiddleware creates a new Middleware that splits log requests by a given interval. func SplitByIntervalMiddleware(limits Limits, merger queryrangebase.Merger, splitter Splitter, metrics *SplitByMetrics) queryrangebase.Middleware { @@ -102,7 +103,7 @@ func (h *splitByInterval) Process( } // don't spawn unnecessary goroutines - var p = parallelism + p := parallelism if len(input) < parallelism { p = len(input) } @@ -170,7 +171,10 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrangebase.Request) (que return h.next.Do(ctx, r) } - intervals := h.splitter(r, interval) + intervals, err := h.splitter(r, interval) + if err != nil { + return nil, err + } h.metrics.splits.Observe(float64(len(intervals))) // no interval should not be processed by the frontend. @@ -182,6 +186,10 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrangebase.Request) (que sp.LogFields(otlog.Int("n_intervals", len(intervals))) } + if len(intervals) == 1 { + return h.next.Do(ctx, intervals[0]) + } + var limit int64 switch req := r.(type) { case *LokiRequest: @@ -213,7 +221,7 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrangebase.Request) (que return h.merger.MergeResponse(resps...) } -func splitByTime(req queryrangebase.Request, interval time.Duration) []queryrangebase.Request { +func splitByTime(req queryrangebase.Request, interval time.Duration) ([]queryrangebase.Request, error) { var reqs []queryrangebase.Request switch r := req.(type) { @@ -248,9 +256,9 @@ func splitByTime(req queryrangebase.Request, interval time.Duration) []queryrang }) }) default: - return nil + return nil, nil } - return reqs + return reqs, nil } func forInterval(interval time.Duration, start, end time.Time, callback func(start, end time.Time)) { @@ -263,8 +271,42 @@ func forInterval(interval time.Duration, start, end time.Time, callback func(sta } } -func splitMetricByTime(r queryrangebase.Request, interval time.Duration) []queryrangebase.Request { +// maxRangeVector returns the maximum range vector duration within a LogQL query. +func maxRangeVector(q string) (time.Duration, error) { + expr, err := logql.ParseSampleExpr(q) + if err != nil { + return 0, err + } + var max time.Duration + expr.Walk(func(e interface{}) { + if r, ok := e.(*logql.LogRange); ok && r.Interval > max { + max = r.Interval + } + }) + return max, nil +} + +// reduceSplitIntervalForRangeVector reduce the split interval for range query based on the range vector. +// Large range vector will be not split into smaller intervals which can cause the queries to be slow by over-processing data. +func reduceSplitIntervalForRangeVector(r queryrangebase.Request, interval time.Duration) (time.Duration, error) { + maxRange, err := maxRangeVector(r.GetQuery()) + if err != nil { + return 0, err + } + if maxRange > interval { + return maxRange, nil + } + return interval, nil +} + +func splitMetricByTime(r queryrangebase.Request, interval time.Duration) ([]queryrangebase.Request, error) { var reqs []queryrangebase.Request + + interval, err := reduceSplitIntervalForRangeVector(r, interval) + if err != nil { + return nil, err + } + lokiReq := r.(*LokiRequest) // step is >= configured split interval, let us just split the query interval by step if lokiReq.Step >= interval.Milliseconds() { @@ -280,7 +322,7 @@ func splitMetricByTime(r queryrangebase.Request, interval time.Duration) []query }) }) - return reqs + return reqs, nil } for start := lokiReq.StartTs; start.Before(lokiReq.EndTs); start = nextIntervalBoundary(start, r.GetStep(), interval).Add(time.Duration(r.GetStep()) * time.Millisecond) { @@ -298,7 +340,7 @@ func splitMetricByTime(r queryrangebase.Request, interval time.Duration) []query EndTs: end, }) } - return reqs + return reqs, nil } // Round up to the step before the next interval boundary. diff --git a/pkg/querier/queryrange/split_by_interval_test.go b/pkg/querier/queryrange/split_by_interval_test.go index f35932479d938..75fc40e1831fe 100644 --- a/pkg/querier/queryrange/split_by_interval_test.go +++ b/pkg/querier/queryrange/split_by_interval_test.go @@ -98,7 +98,9 @@ func Test_splitQuery(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - require.Equal(t, tt.want, splitByTime(tt.req, tt.interval)) + got, err := splitByTime(tt.req, tt.interval) + require.NoError(t, err) + require.Equal(t, tt.want, got) }) } } @@ -117,12 +119,14 @@ func Test_splitMetricQuery(t *testing.T) { StartTs: time.Unix(0, 0), EndTs: time.Unix(0, 60*time.Minute.Nanoseconds()), Step: 15 * seconds, + Query: `rate({app="foo"}[1m])`, }, expected: []queryrangebase.Request{ &LokiRequest{ StartTs: time.Unix(0, 0), EndTs: time.Unix(0, 60*time.Minute.Nanoseconds()), Step: 15 * seconds, + Query: `rate({app="foo"}[1m])`, }, }, interval: 24 * time.Hour, @@ -132,12 +136,14 @@ func Test_splitMetricQuery(t *testing.T) { StartTs: time.Unix(0, 0), EndTs: time.Unix(60*60, 0), Step: 15 * seconds, + Query: `rate({app="foo"}[1m])`, }, expected: []queryrangebase.Request{ &LokiRequest{ StartTs: time.Unix(0, 0), EndTs: time.Unix(60*60, 0), Step: 15 * seconds, + Query: `rate({app="foo"}[1m])`, }, }, interval: 3 * time.Hour, @@ -147,12 +153,14 @@ func Test_splitMetricQuery(t *testing.T) { StartTs: time.Unix(0, 0), EndTs: time.Unix(24*3600, 0), Step: 15 * seconds, + Query: `rate({app="foo"}[1m])`, }, expected: []queryrangebase.Request{ &LokiRequest{ StartTs: time.Unix(0, 0), EndTs: time.Unix(24*3600, 0), Step: 15 * seconds, + Query: `rate({app="foo"}[1m])`, }, }, interval: 24 * time.Hour, @@ -162,12 +170,14 @@ func Test_splitMetricQuery(t *testing.T) { StartTs: time.Unix(0, 0), EndTs: time.Unix(3*3600, 0), Step: 15 * seconds, + Query: `rate({app="foo"}[1m])`, }, expected: []queryrangebase.Request{ &LokiRequest{ StartTs: time.Unix(0, 0), EndTs: time.Unix(3*3600, 0), Step: 15 * seconds, + Query: `rate({app="foo"}[1m])`, }, }, interval: 3 * time.Hour, @@ -177,17 +187,20 @@ func Test_splitMetricQuery(t *testing.T) { StartTs: time.Unix(0, 0), EndTs: time.Unix(2*24*3600, 0), Step: 15 * seconds, + Query: `rate({app="foo"}[1m])`, }, expected: []queryrangebase.Request{ &LokiRequest{ StartTs: time.Unix(0, 0), EndTs: time.Unix((24*3600)-15, 0), Step: 15 * seconds, + Query: `rate({app="foo"}[1m])`, }, &LokiRequest{ StartTs: time.Unix((24 * 3600), 0), EndTs: time.Unix((2 * 24 * 3600), 0), Step: 15 * seconds, + Query: `rate({app="foo"}[1m])`, }, }, interval: 24 * time.Hour, @@ -197,17 +210,20 @@ func Test_splitMetricQuery(t *testing.T) { StartTs: time.Unix(0, 0), EndTs: time.Unix(2*3*3600, 0), Step: 15 * seconds, + Query: `rate({app="foo"}[1m])`, }, expected: []queryrangebase.Request{ &LokiRequest{ StartTs: time.Unix(0, 0), EndTs: time.Unix((3*3600)-15, 0), Step: 15 * seconds, + Query: `rate({app="foo"}[1m])`, }, &LokiRequest{ StartTs: time.Unix((3 * 3600), 0), EndTs: time.Unix((2 * 3 * 3600), 0), Step: 15 * seconds, + Query: `rate({app="foo"}[1m])`, }, }, interval: 3 * time.Hour, @@ -217,22 +233,26 @@ func Test_splitMetricQuery(t *testing.T) { StartTs: time.Unix(3*3600, 0), EndTs: time.Unix(3*24*3600, 0), Step: 15 * seconds, + Query: `rate({app="foo"}[1m])`, }, expected: []queryrangebase.Request{ &LokiRequest{ StartTs: time.Unix(3*3600, 0), EndTs: time.Unix((24*3600)-15, 0), Step: 15 * seconds, + Query: `rate({app="foo"}[1m])`, }, &LokiRequest{ StartTs: time.Unix(24*3600, 0), EndTs: time.Unix((2*24*3600)-15, 0), Step: 15 * seconds, + Query: `rate({app="foo"}[1m])`, }, &LokiRequest{ StartTs: time.Unix(2*24*3600, 0), EndTs: time.Unix(3*24*3600, 0), Step: 15 * seconds, + Query: `rate({app="foo"}[1m])`, }, }, interval: 24 * time.Hour, @@ -242,22 +262,26 @@ func Test_splitMetricQuery(t *testing.T) { StartTs: time.Unix(2*3600, 0), EndTs: time.Unix(3*3*3600, 0), Step: 15 * seconds, + Query: `rate({app="foo"}[1m])`, }, expected: []queryrangebase.Request{ &LokiRequest{ StartTs: time.Unix(2*3600, 0), EndTs: time.Unix((3*3600)-15, 0), Step: 15 * seconds, + Query: `rate({app="foo"}[1m])`, }, &LokiRequest{ StartTs: time.Unix(3*3600, 0), EndTs: time.Unix((2*3*3600)-15, 0), Step: 15 * seconds, + Query: `rate({app="foo"}[1m])`, }, &LokiRequest{ StartTs: time.Unix(2*3*3600, 0), EndTs: time.Unix(3*3*3600, 0), Step: 15 * seconds, + Query: `rate({app="foo"}[1m])`, }, }, interval: 3 * time.Hour, @@ -269,32 +293,38 @@ func Test_splitMetricQuery(t *testing.T) { StartTs: time.Unix(0, 0), EndTs: time.Unix(25*3600, 0), Step: 6 * 3600 * seconds, + Query: `rate({app="foo"}[1m])`, }, expected: []queryrangebase.Request{ &LokiRequest{ StartTs: time.Unix(0, 0), EndTs: time.Unix(6*3600, 0), Step: 6 * 3600 * seconds, + Query: `rate({app="foo"}[1m])`, }, &LokiRequest{ StartTs: time.Unix(6*3600, 0), EndTs: time.Unix(12*3600, 0), Step: 6 * 3600 * seconds, + Query: `rate({app="foo"}[1m])`, }, &LokiRequest{ StartTs: time.Unix(12*3600, 0), EndTs: time.Unix(18*3600, 0), Step: 6 * 3600 * seconds, + Query: `rate({app="foo"}[1m])`, }, &LokiRequest{ StartTs: time.Unix(18*3600, 0), EndTs: time.Unix(24*3600, 0), Step: 6 * 3600 * seconds, + Query: `rate({app="foo"}[1m])`, }, &LokiRequest{ StartTs: time.Unix(24*3600, 0), EndTs: time.Unix(25*3600, 0), Step: 6 * 3600 * seconds, + Query: `rate({app="foo"}[1m])`, }, }, interval: 15 * time.Minute, @@ -304,19 +334,68 @@ func Test_splitMetricQuery(t *testing.T) { StartTs: time.Unix(0, 0), EndTs: time.Unix(3*3600, 0), Step: 6 * 3600 * seconds, + Query: `rate({app="foo"}[1m])`, }, expected: []queryrangebase.Request{ &LokiRequest{ StartTs: time.Unix(0, 0), EndTs: time.Unix(3*3600, 0), Step: 6 * 3600 * seconds, + Query: `rate({app="foo"}[1m])`, + }, + }, + interval: 15 * time.Minute, + }, + // reduce split by to 6h instead of 1h + { + input: &LokiRequest{ + StartTs: time.Unix(2*3600, 0), + EndTs: time.Unix(3*3*3600, 0), + Step: 15 * seconds, + Query: `rate({app="foo"}[6h])`, + }, + expected: []queryrangebase.Request{ + &LokiRequest{ + StartTs: time.Unix(2*3600, 0), + EndTs: time.Unix((6*3600)-15, 0), + Step: 15 * seconds, + Query: `rate({app="foo"}[6h])`, + }, + &LokiRequest{ + StartTs: time.Unix(6*3600, 0), + EndTs: time.Unix(3*3*3600, 0), + Step: 15 * seconds, + Query: `rate({app="foo"}[6h])`, + }, + }, + interval: 1 * time.Hour, + }, + // range vector too large we don't want to split it + { + input: &LokiRequest{ + StartTs: time.Unix(2*3600, 0), + EndTs: time.Unix(3*3*3600, 0), + Step: 15 * seconds, + Query: `rate({app="foo"}[7d])`, + }, + expected: []queryrangebase.Request{ + &LokiRequest{ + StartTs: time.Unix(2*3600, 0), + EndTs: time.Unix(3*3*3600, 0), + Step: 15 * seconds, + Query: `rate({app="foo"}[7d])`, }, }, interval: 15 * time.Minute, }, } { t.Run(strconv.Itoa(i), func(t *testing.T) { - splits := splitMetricByTime(tc.input, tc.interval) + splits, err := splitMetricByTime(tc.input, tc.interval) + require.NoError(t, err) + for i, s := range splits { + s := s.(*LokiRequest) + t.Logf(" want: %d start:%s end:%s \n", i, s.StartTs, s.EndTs) + } require.Equal(t, tc.expected, splits) }) } From 7de684cdf5bfdea784930a96d7f646616f391037 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 19 Jan 2022 10:51:57 +0100 Subject: [PATCH 2/3] Update pkg/querier/queryrange/split_by_interval.go Co-authored-by: Christian Haudum --- pkg/querier/queryrange/split_by_interval.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/querier/queryrange/split_by_interval.go b/pkg/querier/queryrange/split_by_interval.go index 783d66ac09e0a..26ed85a4bb2ce 100644 --- a/pkg/querier/queryrange/split_by_interval.go +++ b/pkg/querier/queryrange/split_by_interval.go @@ -286,8 +286,8 @@ func maxRangeVector(q string) (time.Duration, error) { return max, nil } -// reduceSplitIntervalForRangeVector reduce the split interval for range query based on the range vector. -// Large range vector will be not split into smaller intervals which can cause the queries to be slow by over-processing data. +// reduceSplitIntervalForRangeVector reduces the split interval for a range query based on the duration of the range vector. +// Large range vector durations will not be split into smaller intervals because it can cause the queries to be slow by over-processing data. func reduceSplitIntervalForRangeVector(r queryrangebase.Request, interval time.Duration) (time.Duration, error) { maxRange, err := maxRangeVector(r.GetQuery()) if err != nil { From 956dac6ab5367652cbffa4127c3916ca7deb263b Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 19 Jan 2022 10:53:33 +0100 Subject: [PATCH 3/3] improve fn name. Signed-off-by: Cyril Tovena --- pkg/querier/queryrange/split_by_interval.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/querier/queryrange/split_by_interval.go b/pkg/querier/queryrange/split_by_interval.go index 783d66ac09e0a..e8990f3e90020 100644 --- a/pkg/querier/queryrange/split_by_interval.go +++ b/pkg/querier/queryrange/split_by_interval.go @@ -271,8 +271,8 @@ func forInterval(interval time.Duration, start, end time.Time, callback func(sta } } -// maxRangeVector returns the maximum range vector duration within a LogQL query. -func maxRangeVector(q string) (time.Duration, error) { +// maxRangeVectorDuration returns the maximum range vector duration within a LogQL query. +func maxRangeVectorDuration(q string) (time.Duration, error) { expr, err := logql.ParseSampleExpr(q) if err != nil { return 0, err @@ -289,7 +289,7 @@ func maxRangeVector(q string) (time.Duration, error) { // reduceSplitIntervalForRangeVector reduce the split interval for range query based on the range vector. // Large range vector will be not split into smaller intervals which can cause the queries to be slow by over-processing data. func reduceSplitIntervalForRangeVector(r queryrangebase.Request, interval time.Duration) (time.Duration, error) { - maxRange, err := maxRangeVector(r.GetQuery()) + maxRange, err := maxRangeVectorDuration(r.GetQuery()) if err != nil { return 0, err }