From 1d3e90663db7474e50578ec6500da5340d259fb4 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Mon, 9 May 2022 21:21:56 +0200 Subject: [PATCH 1/5] Add test cases that fail Instant queries of the following type fail and return an `unimplemented` error: ``` sum(count_over_time({foo="bar"} | logfmt | duration > 2s [3s])) / sum(count_over_time({foo="bar"} [3s])) ``` Signed-off-by: Christian Haudum --- pkg/logql/downstream_test.go | 8 ++ pkg/logql/rangemapper_test.go | 148 +++++++++++++++++++++++++--------- 2 files changed, 120 insertions(+), 36 deletions(-) diff --git a/pkg/logql/downstream_test.go b/pkg/logql/downstream_test.go index b584d721b22c9..a845337e61c64 100644 --- a/pkg/logql/downstream_test.go +++ b/pkg/logql/downstream_test.go @@ -132,14 +132,19 @@ func TestRangeMappingEquivalence(t *testing.T) { // sum {`sum(bytes_over_time({a=~".+"}[2s]))`, time.Second}, + {`sum(bytes_over_time({a=~".+"} | logfmt | line > 5 [2s]))`, time.Second}, {`sum(count_over_time({a=~".+"}[2s]))`, time.Second}, + {`sum(count_over_time({a=~".+"} | logfmt | line > 5 [2s]))`, time.Second}, {`sum(sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, + {`sum(sum_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, {`sum(max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`sum(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, {`sum(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`sum(max_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, {`sum(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`sum(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, {`sum(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`sum(min_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, {`sum(rate({a=~".+"}[2s]))`, time.Second}, {`sum(bytes_rate({a=~".+"}[2s]))`, time.Second}, @@ -215,9 +220,11 @@ func TestRangeMappingEquivalence(t *testing.T) { {`min(max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`min(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, {`min(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`min(max_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, {`min(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`min(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, {`min(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`min(min_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, {`min(rate({a=~".+"}[2s]))`, time.Second}, {`min(bytes_rate({a=~".+"}[2s]))`, time.Second}, @@ -237,6 +244,7 @@ func TestRangeMappingEquivalence(t *testing.T) { // Binary operations {`bytes_over_time({a=~".+"}[3s]) + count_over_time({a=~".+"}[5s])`, time.Second}, {`sum(count_over_time({a=~".+"}[3s]) * count(sum_over_time({a=~".+"} | unwrap b [5s])))`, time.Second}, + {`sum(count_over_time({a=~".+"} | logfmt | b > 2 [3s])) / sum(count_over_time({a=~".+"} [3s]))`, time.Second}, // Multi vector aggregator layer queries {`sum(max(bytes_over_time({a=~".+"}[3s])))`, time.Second}, diff --git a/pkg/logql/rangemapper_test.go b/pkg/logql/rangemapper_test.go index e3ed1ac726252..532548917a751 100644 --- a/pkg/logql/rangemapper_test.go +++ b/pkg/logql/rangemapper_test.go @@ -932,6 +932,26 @@ func Test_SplitRangeVectorMapping(t *testing.T) { ) )`, }, + { + `sum (count_over_time({app="foo"} | logfmt | duration > 10s [3m])) / sum (count_over_time({app="foo"} [3m]))`, + `( + sum ( + sum without ( + downstream 10s [1m] offset 2m0s)), shard=> + ++ downstream 10s [1m] offset 1m0s)), shard=> + ++ downstream 10s [1m])), shard=> + ) + ) + / + sum ( + sum without ( + downstream> + ++ downstream> + ++ downstream> + ) + ) + )`, + }, // Multi vector aggregator layer queries { @@ -959,6 +979,98 @@ func Test_SplitRangeVectorMapping(t *testing.T) { ) )`, }, + + // outer vector aggregation is pushed down + { + `sum(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `sum( + min without( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `sum(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `sum( + max without( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `sum(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `sum( + sum without( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `min(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `min( + min without( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `min(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `min( + max without( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `min(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `min( + sum without( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `max(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `max( + min without( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `max(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `max( + max without( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `max(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `max( + sum without( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, } { tc := tc t.Run(tc.expr, func(t *testing.T) { @@ -997,42 +1109,6 @@ func Test_SplitRangeVectorMapping_Noop(t *testing.T) { // should be noop if inner range aggregation includes a stage for label extraction such as `| json` or `| logfmt` // because otherwise the downstream queries would result in too many series - { - `sum(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - `sum(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - }, - { - `sum(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - `sum(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - }, - { - `sum(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - `sum(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - }, - { - `min(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - `min(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - }, - { - `min(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - `min(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - }, - { - `min(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - `min(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - }, - { - `max(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - `max(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - }, - { - `max(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - `max(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - }, - { - `max(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - `max(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - }, { `max_over_time({app="foo"} | json | unwrap bar [3m])`, `max_over_time({app="foo"} | json | unwrap bar [3m])`, From 16920d909dc2f41d866f4e35845d37a6cca08f8b Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Tue, 10 May 2022 10:54:02 +0200 Subject: [PATCH 2/5] Fix certain binary expressions for instant queries If either the left hand side or the right hand side of a binary expression is a noop, we need to return the original expression so the whole expression is a noop as well, and thus not executed using the downstream engine. Otherwise, a binary expression that has a noop on either side, results in an `unimplemented` error when executed using the downstream engine. Signed-off-by: Christian Haudum --- pkg/logql/rangemapper.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/pkg/logql/rangemapper.go b/pkg/logql/rangemapper.go index 86cf2d00274ed..9f54d6845f034 100644 --- a/pkg/logql/rangemapper.go +++ b/pkg/logql/rangemapper.go @@ -128,10 +128,22 @@ func (m RangeMapper) Map(expr syntax.SampleExpr, vectorAggrPushdown *syntax.Vect if err != nil { return nil, err } + // if left hand side is a noop, we need to return the original expression + // so the whole expression is a noop and thus not executed using the + // downstream engine + if e.SampleExpr.String() == lhsMapped.String() { + return e, nil + } rhsMapped, err := m.Map(e.RHS, vectorAggrPushdown, recorder) if err != nil { return nil, err } + // if right hand side is a noop, we need to return the original expression + // so the whole expression is a noop and thus not executed using the + // downstream engine + if e.RHS.String() == rhsMapped.String() { + return e, nil + } e.SampleExpr = lhsMapped e.RHS = rhsMapped return e, nil From fe40f4442c174f5351de7488a9aaf64344577791 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Tue, 10 May 2022 10:58:56 +0200 Subject: [PATCH 3/5] Optimize instant vector aggregations with log extraction stage This change optimizes queries that use a vector aggregation without grouping around a range aggregation with a label extraction stage such as `json` or `logfmt`. Since the vector aggregation can be pushed down to the downstream query, the downstream query does not create a massive amount of streams, even though it contains a generic label extraction stage. Signed-off-by: Christian Haudum --- pkg/logql/rangemapper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/logql/rangemapper.go b/pkg/logql/rangemapper.go index 9f54d6845f034..88de6b9ce2174 100644 --- a/pkg/logql/rangemapper.go +++ b/pkg/logql/rangemapper.go @@ -343,7 +343,7 @@ func (m RangeMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, // We cannot execute downstream queries that would potentially produce a huge amount of series // and therefore would very likely fail. - if expr.Grouping == nil && hasLabelExtractionStage(expr) { + if expr.Grouping == nil && vectorAggrPushdown == nil && hasLabelExtractionStage(expr) { return expr } switch expr.Operation { From 589f92f3c1d45074e75f6a579b99c415fe259d2b Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Wed, 11 May 2022 08:47:45 +0200 Subject: [PATCH 4/5] fixup! Add test cases that fail Signed-off-by: Christian Haudum --- pkg/logql/rangemapper_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/logql/rangemapper_test.go b/pkg/logql/rangemapper_test.go index 532548917a751..f1a0f57baff23 100644 --- a/pkg/logql/rangemapper_test.go +++ b/pkg/logql/rangemapper_test.go @@ -1113,6 +1113,12 @@ func Test_SplitRangeVectorMapping_Noop(t *testing.T) { `max_over_time({app="foo"} | json | unwrap bar [3m])`, `max_over_time({app="foo"} | json | unwrap bar [3m])`, }, + + // if one side of a binary expression is a noop, the full query is a noop as well + { + `sum by (foo) (sum_over_time({app="foo"} | json | unwrap bar [3m])) / sum_over_time({app="foo"} | json | unwrap bar [6m])`, + `sum by (foo) (sum_over_time({app="foo"} | json | unwrap bar [3m])) / sum_over_time({app="foo"} | json | unwrap bar [6m])`, + }, } { tc := tc t.Run(tc.expr, func(t *testing.T) { From f225d3e5e19a811ea31b056e9605eb80e129d815 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Wed, 11 May 2022 10:47:37 +0200 Subject: [PATCH 5/5] fixup! fixup! Add test cases that fail Signed-off-by: Christian Haudum --- pkg/logql/rangemapper_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/logql/rangemapper_test.go b/pkg/logql/rangemapper_test.go index f1a0f57baff23..0f178d2393edc 100644 --- a/pkg/logql/rangemapper_test.go +++ b/pkg/logql/rangemapper_test.go @@ -1117,7 +1117,7 @@ func Test_SplitRangeVectorMapping_Noop(t *testing.T) { // if one side of a binary expression is a noop, the full query is a noop as well { `sum by (foo) (sum_over_time({app="foo"} | json | unwrap bar [3m])) / sum_over_time({app="foo"} | json | unwrap bar [6m])`, - `sum by (foo) (sum_over_time({app="foo"} | json | unwrap bar [3m])) / sum_over_time({app="foo"} | json | unwrap bar [6m])`, + `(sum by (foo) (sum_over_time({app="foo"} | json | unwrap bar [3m])) / sum_over_time({app="foo"} | json | unwrap bar [6m]))`, }, } { tc := tc