From 798677ae7f3ef43e1432ca411b6261be5f5d10fe Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Wed, 22 Jun 2022 16:38:19 +0200 Subject: [PATCH] Fix panic in instant query splitting when using unwrapped rate (#6348) * Fix panic in instant query splitting when using unwrapped rate The range aggregation `rate()` supports both log ranges and unwrapped ranges, e.g. `rate({app="foo"} [$__interval])` and `rate({app="foo"} | unwrap bar [$__interval])` Since `rate()` was split into multiple `count_over_time()` over total duration, but `count_over_time()` does not support `unwrap`, unwrapped rate queries caused panics. This fix changes the splitting of `rate({app="foo"} | unwrap bar [$__interval]` into multiple `sum_over_time()` over total duration. Fixes #6344 Signed-off-by: Christian Haudum * Add tests Signed-off-by: Christian Haudum * Integrate review feedback Co-authored-by: Susana Ferreira Co-authored-by: Susana Ferreira --- pkg/logql/downstream_test.go | 11 +++++++++++ pkg/logql/rangemapper.go | 14 ++++++++++++-- pkg/logql/rangemapper_test.go | 29 +++++++++++++++++++++++++---- 3 files changed, 48 insertions(+), 6 deletions(-) diff --git a/pkg/logql/downstream_test.go b/pkg/logql/downstream_test.go index bc532a05e02ef..36b97b4044ffa 100644 --- a/pkg/logql/downstream_test.go +++ b/pkg/logql/downstream_test.go @@ -125,6 +125,7 @@ func TestRangeMappingEquivalence(t *testing.T) { {`min_over_time({a=~".+"} | unwrap b [2s])`, time.Second}, {`min_over_time({a=~".+"} | unwrap b [2s]) by (a)`, time.Second}, {`rate({a=~".+"}[2s])`, time.Second}, + {`rate({a=~".+"} | unwrap b [2s])`, time.Second}, {`bytes_rate({a=~".+"}[2s])`, time.Second}, // sum @@ -136,6 +137,7 @@ func TestRangeMappingEquivalence(t *testing.T) { {`sum(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`sum(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, {`sum(rate({a=~".+"}[2s]))`, time.Second}, + {`sum(rate({a=~".+"} | unwrap b [2s]))`, time.Second}, {`sum(bytes_rate({a=~".+"}[2s]))`, time.Second}, // sum by @@ -147,6 +149,7 @@ func TestRangeMappingEquivalence(t *testing.T) { {`sum by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`sum by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, {`sum by (a) (rate({a=~".+"}[2s]))`, time.Second}, + {`sum by (a) (rate({a=~".+"} | unwrap b [2s]))`, time.Second}, {`sum by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second}, // count @@ -158,6 +161,7 @@ func TestRangeMappingEquivalence(t *testing.T) { {`count(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`count(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, {`count(rate({a=~".+"}[2s]))`, time.Second}, + {`count(rate({a=~".+"} | unwrap b [2s]))`, time.Second}, {`count(bytes_rate({a=~".+"}[2s]))`, time.Second}, // count by @@ -169,6 +173,7 @@ func TestRangeMappingEquivalence(t *testing.T) { {`count by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`count by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, {`count by (a) (rate({a=~".+"}[2s]))`, time.Second}, + {`count by (a) (rate({a=~".+"} | unwrap b [2s]))`, time.Second}, {`count by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second}, // max @@ -180,6 +185,7 @@ func TestRangeMappingEquivalence(t *testing.T) { {`max(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`max(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, {`max(rate({a=~".+"}[2s]))`, time.Second}, + {`max(rate({a=~".+"} | unwrap b [2s]))`, time.Second}, {`max(bytes_rate({a=~".+"}[2s]))`, time.Second}, // max by @@ -191,6 +197,7 @@ func TestRangeMappingEquivalence(t *testing.T) { {`max by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`max by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, {`max by (a) (rate({a=~".+"}[2s]))`, time.Second}, + {`max by (a) (rate({a=~".+"} | unwrap b [2s]))`, time.Second}, {`max by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second}, // min @@ -202,6 +209,7 @@ func TestRangeMappingEquivalence(t *testing.T) { {`min(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`min(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, {`min(rate({a=~".+"}[2s]))`, time.Second}, + {`min(rate({a=~".+"} | unwrap b [2s]))`, time.Second}, {`min(bytes_rate({a=~".+"}[2s]))`, time.Second}, // min by @@ -213,6 +221,7 @@ func TestRangeMappingEquivalence(t *testing.T) { {`min by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`min by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, {`min by (a) (rate({a=~".+"}[2s]))`, time.Second}, + {`min by (a) (rate({a=~".+"} | unwrap b [2s]))`, time.Second}, {`min by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second}, // Label extraction stage @@ -227,6 +236,7 @@ func TestRangeMappingEquivalence(t *testing.T) { {`sum(min_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, {`sum(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, {`sum(rate({a=~".+"} | logfmt[2s]))`, time.Second}, + {`sum(rate({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, {`sum(bytes_rate({a=~".+"} | logfmt[2s]))`, time.Second}, {`sum by (a) (bytes_over_time({a=~".+"} | logfmt [2s]))`, time.Second}, {`sum by (a) (count_over_time({a=~".+"} | logfmt [2s]))`, time.Second}, @@ -236,6 +246,7 @@ func TestRangeMappingEquivalence(t *testing.T) { {`sum by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, {`sum by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, {`sum by (a) (rate({a=~".+"} | logfmt[2s]))`, time.Second}, + {`sum by (a) (rate({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, {`sum by (a) (bytes_rate({a=~".+"} | logfmt[2s]))`, time.Second}, {`count(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, diff --git a/pkg/logql/rangemapper.go b/pkg/logql/rangemapper.go index 22187d4a0f402..70efa06ba9733 100644 --- a/pkg/logql/rangemapper.go +++ b/pkg/logql/rangemapper.go @@ -183,9 +183,11 @@ func hasLabelExtractionStage(expr syntax.SampleExpr) bool { // sumOverFullRange returns an expression that sums up individual downstream queries (with preserving labels) // and dividing it by the full range in seconds to calculate a rate value. // The operation defines the range aggregation operation of the downstream queries. -// Example: +// Examples: // rate({app="foo"}[2m]) // => (sum without (count_over_time({app="foo"}[1m]) ++ count_over_time({app="foo"}[1m]) offset 1m) / 120) +// rate({app="foo"} | unwrap bar [2m]) +// => (sum without (sum_over_time({app="foo"}[1m]) ++ sum_over_time({app="foo"}[1m]) offset 1m) / 120) func (m RangeMapper) sumOverFullRange(expr *syntax.RangeAggregationExpr, overrideDownstream *syntax.VectorAggregationExpr, operation string, rangeInterval time.Duration, recorder *downstreamRecorder) syntax.SampleExpr { var downstreamExpr syntax.SampleExpr = &syntax.RangeAggregationExpr{ Left: expr.Left, @@ -373,7 +375,15 @@ func (m RangeMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, if labelExtractor && vectorAggrPushdown.Operation != syntax.OpTypeSum { return expr } - return m.sumOverFullRange(expr, vectorAggrPushdown, syntax.OpRangeTypeCount, rangeInterval, recorder) + // rate({app="foo"}[2m]) => + // => (sum without (count_over_time({app="foo"}[1m]) ++ count_over_time({app="foo"}[1m]) offset 1m) / 120) + op := syntax.OpRangeTypeCount + if expr.Left.Unwrap != nil { + // rate({app="foo"} | unwrap bar [2m]) + // => (sum without (sum_over_time({app="foo"}[1m]) ++ sum_over_time({app="foo"}[1m]) offset 1m) / 120) + op = syntax.OpRangeTypeSum + } + return m.sumOverFullRange(expr, vectorAggrPushdown, op, rangeInterval, recorder) case syntax.OpRangeTypeBytesRate: if labelExtractor && vectorAggrPushdown.Operation != syntax.OpTypeSum { return expr diff --git a/pkg/logql/rangemapper_test.go b/pkg/logql/rangemapper_test.go index ba2dfc719ef70..970526934c7ff 100644 --- a/pkg/logql/rangemapper_test.go +++ b/pkg/logql/rangemapper_test.go @@ -130,6 +130,14 @@ func Test_SplitRangeVectorMapping(t *testing.T) { ++ downstream> ) / 180)`, }, + { + `rate({app="foo"} | unwrap bar[3m])`, + `(sum without( + downstream> + ++ downstream> + ++ downstream> + ) / 180)`, + }, { `bytes_rate({app="foo"}[3m])`, `(sum without( @@ -1471,6 +1479,23 @@ func Test_SplitRangeVectorMapping(t *testing.T) { ) )`, }, + + // regression test queries + { + `topk(10,sum by (org_id) (rate({container="query-frontend",namespace="loki"} |= "metrics.go" | logfmt | unwrap bytes(total_bytes) | __error__="" [3m])))`, + `topk(10, + sum by (org_id) ( + ( + sum without( + downstream> + ++ downstream> + ++ downstream> + ) + / 180 + ) + ) + )`, + }, } { tc := tc t.Run(tc.expr, func(t *testing.T) { @@ -1500,10 +1525,6 @@ func Test_SplitRangeVectorMapping_Noop(t *testing.T) { `sum(avg_over_time({app="foo"} | unwrap bar[3m]))`, `sum(avg_over_time({app="foo"} | unwrap bar[3m]))`, }, - { // this query caused a panic in ops - `topk(10,sum by (cluster,org_id) (rate({container="query-frontend",namespace="loki-prod",cluster="prod-us-central-0"} |= "metrics.go" | logfmt | unwrap bytes(total_bytes) | __error__=""[1h])))`, - `topk(10,sum by (cluster,org_id) (rate({container="query-frontend",namespace="loki-prod",cluster="prod-us-central-0"} |= "metrics.go" | logfmt | unwrap bytes(total_bytes) | __error__=""[1h])))`, - }, // should be noop if range interval is lower or equal to split interval (1m) {