Skip to content

Commit

Permalink
Avoid splitting large range vector aggregation. (#5172)
Browse files Browse the repository at this point in the history
* Reduce split by for large range vector

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Update pkg/querier/queryrange/split_by_interval.go

Co-authored-by: Christian Haudum <christian.haudum@gmail.com>

* improve fn name.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

Co-authored-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
cyriltovena and chaudum authored Jan 19, 2022
1 parent fafa8d1 commit 1a7614f
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 11 deletions.
60 changes: 51 additions & 9 deletions pkg/querier/queryrange/split_by_interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/weaveworks/common/httpgrpc"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/tenant"
)
Expand Down Expand Up @@ -49,7 +50,7 @@ type splitByInterval struct {
splitter Splitter
}

type Splitter func(req queryrangebase.Request, interval time.Duration) []queryrangebase.Request
type Splitter func(req queryrangebase.Request, interval time.Duration) ([]queryrangebase.Request, error)

// SplitByIntervalMiddleware creates a new Middleware that splits log requests by a given interval.
func SplitByIntervalMiddleware(limits Limits, merger queryrangebase.Merger, splitter Splitter, metrics *SplitByMetrics) queryrangebase.Middleware {
Expand Down Expand Up @@ -102,7 +103,7 @@ func (h *splitByInterval) Process(
}

// don't spawn unnecessary goroutines
var p = parallelism
p := parallelism
if len(input) < parallelism {
p = len(input)
}
Expand Down Expand Up @@ -170,7 +171,10 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrangebase.Request) (que
return h.next.Do(ctx, r)
}

intervals := h.splitter(r, interval)
intervals, err := h.splitter(r, interval)
if err != nil {
return nil, err
}
h.metrics.splits.Observe(float64(len(intervals)))

// no interval should not be processed by the frontend.
Expand All @@ -182,6 +186,10 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrangebase.Request) (que
sp.LogFields(otlog.Int("n_intervals", len(intervals)))
}

if len(intervals) == 1 {
return h.next.Do(ctx, intervals[0])
}

var limit int64
switch req := r.(type) {
case *LokiRequest:
Expand Down Expand Up @@ -213,7 +221,7 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrangebase.Request) (que
return h.merger.MergeResponse(resps...)
}

func splitByTime(req queryrangebase.Request, interval time.Duration) []queryrangebase.Request {
func splitByTime(req queryrangebase.Request, interval time.Duration) ([]queryrangebase.Request, error) {
var reqs []queryrangebase.Request

switch r := req.(type) {
Expand Down Expand Up @@ -248,9 +256,9 @@ func splitByTime(req queryrangebase.Request, interval time.Duration) []queryrang
})
})
default:
return nil
return nil, nil
}
return reqs
return reqs, nil
}

func forInterval(interval time.Duration, start, end time.Time, callback func(start, end time.Time)) {
Expand All @@ -263,8 +271,42 @@ func forInterval(interval time.Duration, start, end time.Time, callback func(sta
}
}

func splitMetricByTime(r queryrangebase.Request, interval time.Duration) []queryrangebase.Request {
// maxRangeVectorDuration returns the maximum range vector duration within a LogQL query.
func maxRangeVectorDuration(q string) (time.Duration, error) {
expr, err := logql.ParseSampleExpr(q)
if err != nil {
return 0, err
}
var max time.Duration
expr.Walk(func(e interface{}) {
if r, ok := e.(*logql.LogRange); ok && r.Interval > max {
max = r.Interval
}
})
return max, nil
}

// reduceSplitIntervalForRangeVector reduces the split interval for a range query based on the duration of the range vector.
// Large range vector durations will not be split into smaller intervals because it can cause the queries to be slow by over-processing data.
func reduceSplitIntervalForRangeVector(r queryrangebase.Request, interval time.Duration) (time.Duration, error) {
maxRange, err := maxRangeVectorDuration(r.GetQuery())
if err != nil {
return 0, err
}
if maxRange > interval {
return maxRange, nil
}
return interval, nil
}

func splitMetricByTime(r queryrangebase.Request, interval time.Duration) ([]queryrangebase.Request, error) {
var reqs []queryrangebase.Request

interval, err := reduceSplitIntervalForRangeVector(r, interval)
if err != nil {
return nil, err
}

lokiReq := r.(*LokiRequest)
// step is >= configured split interval, let us just split the query interval by step
if lokiReq.Step >= interval.Milliseconds() {
Expand All @@ -280,7 +322,7 @@ func splitMetricByTime(r queryrangebase.Request, interval time.Duration) []query
})
})

return reqs
return reqs, nil
}

for start := lokiReq.StartTs; start.Before(lokiReq.EndTs); start = nextIntervalBoundary(start, r.GetStep(), interval).Add(time.Duration(r.GetStep()) * time.Millisecond) {
Expand All @@ -298,7 +340,7 @@ func splitMetricByTime(r queryrangebase.Request, interval time.Duration) []query
EndTs: end,
})
}
return reqs
return reqs, nil
}

// Round up to the step before the next interval boundary.
Expand Down
Loading

0 comments on commit 1a7614f

Please sign in to comment.