From 797d2b185539803ab1f397aa6adf87238c6a5f0d Mon Sep 17 00:00:00 2001 From: Susana Ferreira Date: Fri, 18 Mar 2022 12:10:08 +0100 Subject: [PATCH] Add label grouping downstream optimization to rate and bytes_rate expressions --- pkg/logql/downstream_test.go | 98 +++++++++++++++++- pkg/logql/rangemapper.go | 27 +++-- pkg/logql/rangemapper_test.go | 190 ++++++++++++++++++++++++++++++++-- 3 files changed, 293 insertions(+), 22 deletions(-) diff --git a/pkg/logql/downstream_test.go b/pkg/logql/downstream_test.go index c9a7a6d60b534..05197c5741a9f 100644 --- a/pkg/logql/downstream_test.go +++ b/pkg/logql/downstream_test.go @@ -137,6 +137,8 @@ func TestRangeMappingEquivalence(t *testing.T) { {`sum(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, {`sum(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`sum(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, + {`sum(rate({a=~".+"}[2s]))`, time.Second}, + {`sum(bytes_rate({a=~".+"}[2s]))`, time.Second}, // sum by {`sum by (a) (bytes_over_time({a=~".+"}[2s]))`, time.Second}, @@ -146,6 +148,8 @@ func TestRangeMappingEquivalence(t *testing.T) { {`sum by (a) (max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, {`sum by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`sum by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, + {`sum by (a) (rate({a=~".+"}[2s]))`, time.Second}, + {`sum by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second}, // count {`count(bytes_over_time({a=~".+"}[2s]))`, time.Second}, @@ -155,6 +159,8 @@ func TestRangeMappingEquivalence(t *testing.T) { {`count(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, {`count(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`count(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, + {`count(rate({a=~".+"}[2s]))`, time.Second}, + {`count(bytes_rate({a=~".+"}[2s]))`, time.Second}, // count by {`count by (a) (bytes_over_time({a=~".+"}[2s]))`, time.Second}, @@ -164,6 +170,8 @@ func TestRangeMappingEquivalence(t *testing.T) { {`count by (a) (max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, {`count by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`count by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, + {`count by (a) (rate({a=~".+"}[2s]))`, time.Second}, + {`count by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second}, // max {`max(bytes_over_time({a=~".+"}[2s]))`, time.Second}, @@ -173,6 +181,8 @@ func TestRangeMappingEquivalence(t *testing.T) { {`max(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, {`max(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`max(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, + {`max(rate({a=~".+"}[2s]))`, time.Second}, + {`max(bytes_rate({a=~".+"}[2s]))`, time.Second}, // max by {`max by (a) (bytes_over_time({a=~".+"}[2s]))`, time.Second}, @@ -182,6 +192,8 @@ func TestRangeMappingEquivalence(t *testing.T) { {`max by (a) (max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, {`max by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`max by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, + {`max by (a) (rate({a=~".+"}[2s]))`, time.Second}, + {`max by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second}, // min {`min(bytes_over_time({a=~".+"}[2s]))`, time.Second}, @@ -191,6 +203,8 @@ func TestRangeMappingEquivalence(t *testing.T) { {`min(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, {`min(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`min(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, + {`min(rate({a=~".+"}[2s]))`, time.Second}, + {`min(bytes_rate({a=~".+"}[2s]))`, time.Second}, // min by {`min by (a) (bytes_over_time({a=~".+"}[2s]))`, time.Second}, @@ -200,9 +214,8 @@ func TestRangeMappingEquivalence(t *testing.T) { {`min by (a) (max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, {`min by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`min by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, - - {`sum by (b) (rate({a=~".+"}[2s]))`, time.Second}, - {`sum by (b) (bytes_rate({a=~".+"}[2s]))`, time.Second}, + {`min by (a) (rate({a=~".+"}[2s]))`, time.Second}, + {`min by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second}, // Binary operations {`bytes_over_time({a=~".+"}[3s]) + count_over_time({a=~".+"}[5s])`, time.Second}, @@ -211,7 +224,8 @@ func TestRangeMappingEquivalence(t *testing.T) { // Multi vector aggregator layer queries {`sum(max(bytes_over_time({a=~".+"}[3s])))`, time.Second}, - // Non-splittable vector aggregators - TODO: Fix + // Non-splittable vector aggregators + // TODO: Fix topk //{`topk(2, count_over_time({a=~".+"}[2s]))`, time.Second}, {`avg(count_over_time({a=~".+"}[2s]))`, time.Second}, @@ -593,6 +607,82 @@ func TestRangeMappingEquivalenceMockMapper(t *testing.T) { Operation: syntax.OpTypeMin, }, }, + + { + `sum by (a) (rate({a=~".+"}[2s]))`, + &syntax.VectorAggregationExpr{ + Left: &syntax.VectorAggregationExpr{ + Left: &syntax.BinOpExpr{ + SampleExpr: &syntax.VectorAggregationExpr{ + Left: &ConcatSampleExpr{ + DownstreamSampleExpr: DownstreamSampleExpr{ + SampleExpr: &syntax.VectorAggregationExpr{ + Left: &syntax.RangeAggregationExpr{ + Left: &syntax.LogRange{ + Left: &syntax.MatchersExpr{ + Mts: []*labels.Matcher{labelMatcher}, + }, + Interval: time.Second, + Offset: time.Second, + Unwrap: nil, + }, + Operation: syntax.OpRangeTypeCount, + Grouping: nil, + }, + Grouping: &syntax.Grouping{ + Groups: []string{"a"}, + Without: false, + }, + Params: 0, + Operation: syntax.OpTypeSum, + }, + }, + next: &ConcatSampleExpr{ + DownstreamSampleExpr: DownstreamSampleExpr{ + SampleExpr: &syntax.VectorAggregationExpr{ + Left: &syntax.RangeAggregationExpr{ + Left: &syntax.LogRange{ + Left: &syntax.MatchersExpr{ + Mts: []*labels.Matcher{labelMatcher}, + }, + Interval: time.Second, + Offset: 0, + Unwrap: nil, + }, + Operation: syntax.OpRangeTypeCount, + Grouping: nil, + }, + Grouping: &syntax.Grouping{ + Groups: []string{"a"}, + Without: false, + }, + Params: 0, + Operation: syntax.OpTypeSum, + }, + }, + next: nil, + }, + }, + Grouping: &syntax.Grouping{ + Without: true, + }, + Operation: syntax.OpTypeSum, + }, + RHS: &syntax.LiteralExpr{Val: (2 * time.Second).Seconds()}, + Op: syntax.OpTypeDiv, + Opts: &syntax.BinOpOptions{}, + }, + Grouping: &syntax.Grouping{ + Without: true, + }, + }, + Grouping: &syntax.Grouping{ + Groups: []string{"a"}, + Without: false, + }, + Operation: syntax.OpTypeSum, + }, + }, } { q := NewMockQuerier( shards, diff --git a/pkg/logql/rangemapper.go b/pkg/logql/rangemapper.go index f713ef67d7020..97ae1f76787dd 100644 --- a/pkg/logql/rangemapper.go +++ b/pkg/logql/rangemapper.go @@ -110,18 +110,26 @@ func hasLabelExtractionStage(expr syntax.SampleExpr) bool { // Example: // rate({app="foo"}[2m]) // => (sum without (count_over_time({app="foo"}[1m]) ++ count_over_time({app="foo"}[1m]) offset 1m) / 120) -func (m RangeVectorMapper) sumOverFullRange(expr *syntax.RangeAggregationExpr, operation string, rangeInterval time.Duration) syntax.SampleExpr { - without := &syntax.Grouping{ - Without: true, - } - downstreamExpr := &syntax.RangeAggregationExpr{ +func (m RangeVectorMapper) sumOverFullRange(expr *syntax.RangeAggregationExpr, overrideDownstream *syntax.VectorAggregationExpr, operation string, rangeInterval time.Duration) syntax.SampleExpr { + var downstreamExpr syntax.SampleExpr = &syntax.RangeAggregationExpr{ Left: expr.Left, Operation: operation, } + // Optimization: in case overrideDownstream exists, the downstream expression can be optimized with the grouping + // and operation of the overrideDownstream expression in order to reduce the returned streams' label set. + if overrideDownstream != nil { + downstreamExpr = &syntax.VectorAggregationExpr{ + Left: downstreamExpr, + Grouping: overrideDownstream.Grouping, + Operation: overrideDownstream.Operation, + } + } return &syntax.BinOpExpr{ SampleExpr: &syntax.VectorAggregationExpr{ - Left: m.mapConcatSampleExpr(downstreamExpr, rangeInterval), - Grouping: without, + Left: m.mapConcatSampleExpr(downstreamExpr, rangeInterval), + Grouping: &syntax.Grouping{ + Without: true, + }, Operation: syntax.OpTypeSum, }, RHS: &syntax.LiteralExpr{Val: rangeInterval.Seconds()}, @@ -143,7 +151,6 @@ func (m RangeVectorMapper) splitDownstreams(downstreams *ConcatSampleExpr, expr concrete.Left.Offset += offset } } - }) downstreams = &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ @@ -281,9 +288,9 @@ func (m RangeVectorMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregation Operation: syntax.OpTypeMin, } case syntax.OpRangeTypeRate: - return m.sumOverFullRange(expr, syntax.OpRangeTypeCount, rangeInterval) + return m.sumOverFullRange(expr, overrideDownstream, syntax.OpRangeTypeCount, rangeInterval) case syntax.OpRangeTypeBytesRate: - return m.sumOverFullRange(expr, syntax.OpRangeTypeBytes, rangeInterval) + return m.sumOverFullRange(expr, overrideDownstream, syntax.OpRangeTypeBytes, rangeInterval) default: // this should not be reachable. If an operation is splittable it should // have an optimization listed diff --git a/pkg/logql/rangemapper_test.go b/pkg/logql/rangemapper_test.go index 98eeba266e54b..8cd93edf5a403 100644 --- a/pkg/logql/rangemapper_test.go +++ b/pkg/logql/rangemapper_test.go @@ -131,7 +131,6 @@ func Test_SplitRangeVectorMapping(t *testing.T) { )`, false, }, - // range aggregation - rate { `rate({app="foo"}[3m])`, `(sum without( @@ -141,7 +140,6 @@ func Test_SplitRangeVectorMapping(t *testing.T) { ) / 180)`, false, }, - // range aggregation - bytes_rate { `bytes_rate({app="foo"}[3m])`, `(sum without( @@ -230,6 +228,28 @@ func Test_SplitRangeVectorMapping(t *testing.T) { )`, false, }, + { + `sum(rate({app="foo"}[3m]))`, + `sum( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + false, + }, + { + `sum(bytes_rate({app="foo"}[3m]))`, + `sum( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + false, + }, // Vector aggregator - sum by { @@ -309,6 +329,28 @@ func Test_SplitRangeVectorMapping(t *testing.T) { )`, false, }, + { + `sum by (baz) (rate({app="foo"}[3m]))`, + `sum by (baz) ( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + false, + }, + { + `sum by (baz) (bytes_rate({app="foo"}[3m]))`, + `sum by (baz) ( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + false, + }, // Vector aggregator - count { @@ -388,6 +430,28 @@ func Test_SplitRangeVectorMapping(t *testing.T) { )`, false, }, + { + `count(rate({app="foo"}[3m]))`, + `count( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + false, + }, + { + `count(bytes_rate({app="foo"}[3m]))`, + `count( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + false, + }, // Vector aggregator - count by { @@ -467,6 +531,28 @@ func Test_SplitRangeVectorMapping(t *testing.T) { )`, false, }, + { + `count by (baz) (rate({app="foo"}[3m]))`, + `count by (baz) ( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + false, + }, + { + `count by (baz) (bytes_rate({app="foo"}[3m]))`, + `count by (baz) ( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + false, + }, // Vector aggregator - max { @@ -546,6 +632,28 @@ func Test_SplitRangeVectorMapping(t *testing.T) { )`, false, }, + { + `max(rate({app="foo"}[3m]))`, + `max( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + false, + }, + { + `max(bytes_rate({app="foo"}[3m]))`, + `max( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + false, + }, // Vector aggregator - max by { @@ -625,6 +733,28 @@ func Test_SplitRangeVectorMapping(t *testing.T) { )`, false, }, + { + `max by (baz) (rate({app="foo"}[3m]))`, + `max by (baz) ( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + false, + }, + { + `max by (baz) (bytes_rate({app="foo"}[3m]))`, + `max by (baz) ( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + false, + }, // Vector aggregator - min { @@ -704,6 +834,28 @@ func Test_SplitRangeVectorMapping(t *testing.T) { )`, false, }, + { + `min(rate({app="foo"}[3m]))`, + `min( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + false, + }, + { + `min(bytes_rate({app="foo"}[3m]))`, + `min( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + false, + }, // Vector aggregator - min by { @@ -783,6 +935,28 @@ func Test_SplitRangeVectorMapping(t *testing.T) { )`, false, }, + { + `min by (baz) (rate({app="foo"}[3m]))`, + `min by (baz) ( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + false, + }, + { + `min by (baz) (bytes_rate({app="foo"}[3m]))`, + `min by (baz) ( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + false, + }, // Binary operations { @@ -828,17 +1002,17 @@ func Test_SplitRangeVectorMapping(t *testing.T) { `( sum by (app) ( (sum without ( - downstream> - ++ downstream> - ++ downstream> + downstream> + ++ downstream> + ++ downstream> ) / 180) ) / sum by (app) ( (sum without ( - downstream> - ++ downstream> - ++ downstream> + downstream> + ++ downstream> + ++ downstream> ) / 180) ) )`,