Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid splitting large range vector aggregation. #5172

Merged
merged 5 commits into from
Jan 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 51 additions & 9 deletions pkg/querier/queryrange/split_by_interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/weaveworks/common/httpgrpc"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/tenant"
)
Expand Down Expand Up @@ -49,7 +50,7 @@ type splitByInterval struct {
splitter Splitter
}

type Splitter func(req queryrangebase.Request, interval time.Duration) []queryrangebase.Request
type Splitter func(req queryrangebase.Request, interval time.Duration) ([]queryrangebase.Request, error)

// SplitByIntervalMiddleware creates a new Middleware that splits log requests by a given interval.
func SplitByIntervalMiddleware(limits Limits, merger queryrangebase.Merger, splitter Splitter, metrics *SplitByMetrics) queryrangebase.Middleware {
Expand Down Expand Up @@ -102,7 +103,7 @@ func (h *splitByInterval) Process(
}

// don't spawn unnecessary goroutines
var p = parallelism
p := parallelism
if len(input) < parallelism {
p = len(input)
}
Expand Down Expand Up @@ -170,7 +171,10 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrangebase.Request) (que
return h.next.Do(ctx, r)
}

intervals := h.splitter(r, interval)
intervals, err := h.splitter(r, interval)
if err != nil {
return nil, err
}
h.metrics.splits.Observe(float64(len(intervals)))

// no interval should not be processed by the frontend.
Expand All @@ -182,6 +186,10 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrangebase.Request) (que
sp.LogFields(otlog.Int("n_intervals", len(intervals)))
}

if len(intervals) == 1 {
return h.next.Do(ctx, intervals[0])
}

var limit int64
switch req := r.(type) {
case *LokiRequest:
Expand Down Expand Up @@ -213,7 +221,7 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrangebase.Request) (que
return h.merger.MergeResponse(resps...)
}

func splitByTime(req queryrangebase.Request, interval time.Duration) []queryrangebase.Request {
func splitByTime(req queryrangebase.Request, interval time.Duration) ([]queryrangebase.Request, error) {
var reqs []queryrangebase.Request

switch r := req.(type) {
Expand Down Expand Up @@ -248,9 +256,9 @@ func splitByTime(req queryrangebase.Request, interval time.Duration) []queryrang
})
})
default:
return nil
return nil, nil
}
return reqs
return reqs, nil
}

func forInterval(interval time.Duration, start, end time.Time, callback func(start, end time.Time)) {
Expand All @@ -263,8 +271,42 @@ func forInterval(interval time.Duration, start, end time.Time, callback func(sta
}
}

func splitMetricByTime(r queryrangebase.Request, interval time.Duration) []queryrangebase.Request {
// maxRangeVectorDuration returns the maximum range vector duration within a LogQL query.
func maxRangeVectorDuration(q string) (time.Duration, error) {
expr, err := logql.ParseSampleExpr(q)
if err != nil {
return 0, err
}
var max time.Duration
expr.Walk(func(e interface{}) {
if r, ok := e.(*logql.LogRange); ok && r.Interval > max {
max = r.Interval
}
})
return max, nil
}

// reduceSplitIntervalForRangeVector reduces the split interval for a range query based on the duration of the range vector.
// Large range vector durations will not be split into smaller intervals because it can cause the queries to be slow by over-processing data.
func reduceSplitIntervalForRangeVector(r queryrangebase.Request, interval time.Duration) (time.Duration, error) {
maxRange, err := maxRangeVectorDuration(r.GetQuery())
if err != nil {
return 0, err
}
if maxRange > interval {
return maxRange, nil
}
return interval, nil
}

func splitMetricByTime(r queryrangebase.Request, interval time.Duration) ([]queryrangebase.Request, error) {
var reqs []queryrangebase.Request

interval, err := reduceSplitIntervalForRangeVector(r, interval)
if err != nil {
return nil, err
}

lokiReq := r.(*LokiRequest)
// step is >= configured split interval, let us just split the query interval by step
if lokiReq.Step >= interval.Milliseconds() {
Expand All @@ -280,7 +322,7 @@ func splitMetricByTime(r queryrangebase.Request, interval time.Duration) []query
})
})

return reqs
return reqs, nil
}

for start := lokiReq.StartTs; start.Before(lokiReq.EndTs); start = nextIntervalBoundary(start, r.GetStep(), interval).Add(time.Duration(r.GetStep()) * time.Millisecond) {
Expand All @@ -298,7 +340,7 @@ func splitMetricByTime(r queryrangebase.Request, interval time.Duration) []query
EndTs: end,
})
}
return reqs
return reqs, nil
}

// Round up to the step before the next interval boundary.
Expand Down
Loading