Skip to content

Commit

Permalink
Fix panic in instant query splitting when using unwrapped rate (#6348)
Browse files Browse the repository at this point in the history
* Fix panic in instant query splitting when using unwrapped rate

The range aggregation `rate()` supports both log ranges and unwrapped
ranges, e.g.

`rate({app="foo"} [$__interval])`
and
`rate({app="foo"} | unwrap bar [$__interval])`

Since `rate()` was split into multiple `count_over_time()` over total
duration, but `count_over_time()` does not support `unwrap`, unwrapped
rate queries caused panics.

This fix changes the splitting of `rate({app="foo"} | unwrap bar [$__interval]`
into multiple `sum_over_time()` over total duration.

Fixes #6344

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Add tests

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Integrate review feedback

Co-authored-by: Susana Ferreira <susana.ferreira@grafana.com>

Co-authored-by: Susana Ferreira <susana.ferreira@grafana.com>
  • Loading branch information
chaudum and ssncferreira authored Jun 22, 2022
1 parent 8dcc2d6 commit 798677a
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 6 deletions.
11 changes: 11 additions & 0 deletions pkg/logql/downstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`min_over_time({a=~".+"} | unwrap b [2s])`, time.Second},
{`min_over_time({a=~".+"} | unwrap b [2s]) by (a)`, time.Second},
{`rate({a=~".+"}[2s])`, time.Second},
{`rate({a=~".+"} | unwrap b [2s])`, time.Second},
{`bytes_rate({a=~".+"}[2s])`, time.Second},

// sum
Expand All @@ -136,6 +137,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`sum(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`sum(rate({a=~".+"}[2s]))`, time.Second},
{`sum(rate({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum(bytes_rate({a=~".+"}[2s]))`, time.Second},

// sum by
Expand All @@ -147,6 +149,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`sum by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`sum by (a) (rate({a=~".+"}[2s]))`, time.Second},
{`sum by (a) (rate({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second},

// count
Expand All @@ -158,6 +161,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`count(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`count(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`count(rate({a=~".+"}[2s]))`, time.Second},
{`count(rate({a=~".+"} | unwrap b [2s]))`, time.Second},
{`count(bytes_rate({a=~".+"}[2s]))`, time.Second},

// count by
Expand All @@ -169,6 +173,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`count by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`count by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`count by (a) (rate({a=~".+"}[2s]))`, time.Second},
{`count by (a) (rate({a=~".+"} | unwrap b [2s]))`, time.Second},
{`count by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second},

// max
Expand All @@ -180,6 +185,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`max(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`max(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`max(rate({a=~".+"}[2s]))`, time.Second},
{`max(rate({a=~".+"} | unwrap b [2s]))`, time.Second},
{`max(bytes_rate({a=~".+"}[2s]))`, time.Second},

// max by
Expand All @@ -191,6 +197,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`max by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`max by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`max by (a) (rate({a=~".+"}[2s]))`, time.Second},
{`max by (a) (rate({a=~".+"} | unwrap b [2s]))`, time.Second},
{`max by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second},

// min
Expand All @@ -202,6 +209,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`min(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`min(rate({a=~".+"}[2s]))`, time.Second},
{`min(rate({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min(bytes_rate({a=~".+"}[2s]))`, time.Second},

// min by
Expand All @@ -213,6 +221,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`min by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`min by (a) (rate({a=~".+"}[2s]))`, time.Second},
{`min by (a) (rate({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second},

// Label extraction stage
Expand All @@ -227,6 +236,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`sum(min_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second},
{`sum(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`sum(rate({a=~".+"} | logfmt[2s]))`, time.Second},
{`sum(rate({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second},
{`sum(bytes_rate({a=~".+"} | logfmt[2s]))`, time.Second},
{`sum by (a) (bytes_over_time({a=~".+"} | logfmt [2s]))`, time.Second},
{`sum by (a) (count_over_time({a=~".+"} | logfmt [2s]))`, time.Second},
Expand All @@ -236,6 +246,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`sum by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second},
{`sum by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`sum by (a) (rate({a=~".+"} | logfmt[2s]))`, time.Second},
{`sum by (a) (rate({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second},
{`sum by (a) (bytes_rate({a=~".+"} | logfmt[2s]))`, time.Second},

{`count(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
Expand Down
14 changes: 12 additions & 2 deletions pkg/logql/rangemapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,11 @@ func hasLabelExtractionStage(expr syntax.SampleExpr) bool {
// sumOverFullRange returns an expression that sums up individual downstream queries (with preserving labels)
// and dividing it by the full range in seconds to calculate a rate value.
// The operation defines the range aggregation operation of the downstream queries.
// Example:
// Examples:
// rate({app="foo"}[2m])
// => (sum without (count_over_time({app="foo"}[1m]) ++ count_over_time({app="foo"}[1m]) offset 1m) / 120)
// rate({app="foo"} | unwrap bar [2m])
// => (sum without (sum_over_time({app="foo"}[1m]) ++ sum_over_time({app="foo"}[1m]) offset 1m) / 120)
func (m RangeMapper) sumOverFullRange(expr *syntax.RangeAggregationExpr, overrideDownstream *syntax.VectorAggregationExpr, operation string, rangeInterval time.Duration, recorder *downstreamRecorder) syntax.SampleExpr {
var downstreamExpr syntax.SampleExpr = &syntax.RangeAggregationExpr{
Left: expr.Left,
Expand Down Expand Up @@ -373,7 +375,15 @@ func (m RangeMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,
if labelExtractor && vectorAggrPushdown.Operation != syntax.OpTypeSum {
return expr
}
return m.sumOverFullRange(expr, vectorAggrPushdown, syntax.OpRangeTypeCount, rangeInterval, recorder)
// rate({app="foo"}[2m]) =>
// => (sum without (count_over_time({app="foo"}[1m]) ++ count_over_time({app="foo"}[1m]) offset 1m) / 120)
op := syntax.OpRangeTypeCount
if expr.Left.Unwrap != nil {
// rate({app="foo"} | unwrap bar [2m])
// => (sum without (sum_over_time({app="foo"}[1m]) ++ sum_over_time({app="foo"}[1m]) offset 1m) / 120)
op = syntax.OpRangeTypeSum
}
return m.sumOverFullRange(expr, vectorAggrPushdown, op, rangeInterval, recorder)
case syntax.OpRangeTypeBytesRate:
if labelExtractor && vectorAggrPushdown.Operation != syntax.OpTypeSum {
return expr
Expand Down
29 changes: 25 additions & 4 deletions pkg/logql/rangemapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,14 @@ func Test_SplitRangeVectorMapping(t *testing.T) {
++ downstream<count_over_time({app="foo"}[1m]), shard=<nil>>
) / 180)`,
},
{
`rate({app="foo"} | unwrap bar[3m])`,
`(sum without(
downstream<sum_over_time({app="foo"} | unwrap bar [1m] offset 2m0s), shard=<nil>>
++ downstream<sum_over_time({app="foo"} | unwrap bar [1m] offset 1m0s), shard=<nil>>
++ downstream<sum_over_time({app="foo"} | unwrap bar [1m]), shard=<nil>>
) / 180)`,
},
{
`bytes_rate({app="foo"}[3m])`,
`(sum without(
Expand Down Expand Up @@ -1471,6 +1479,23 @@ func Test_SplitRangeVectorMapping(t *testing.T) {
)
)`,
},

// regression test queries
{
`topk(10,sum by (org_id) (rate({container="query-frontend",namespace="loki"} |= "metrics.go" | logfmt | unwrap bytes(total_bytes) | __error__="" [3m])))`,
`topk(10,
sum by (org_id) (
(
sum without(
downstream<sum by(org_id)(sum_over_time({container="query-frontend",namespace="loki"} |= "metrics.go" | logfmt | unwrap bytes(total_bytes) | __error__="" [1m] offset 2m0s)),shard=<nil>>
++ downstream<sum by(org_id)(sum_over_time({container="query-frontend",namespace="loki"} |= "metrics.go" | logfmt | unwrap bytes(total_bytes) | __error__="" [1m] offset 1m0s)),shard=<nil>>
++ downstream<sum by(org_id)(sum_over_time({container="query-frontend",namespace="loki"} |= "metrics.go" | logfmt | unwrap bytes(total_bytes) | __error__="" [1m])),shard=<nil>>
)
/ 180
)
)
)`,
},
} {
tc := tc
t.Run(tc.expr, func(t *testing.T) {
Expand Down Expand Up @@ -1500,10 +1525,6 @@ func Test_SplitRangeVectorMapping_Noop(t *testing.T) {
`sum(avg_over_time({app="foo"} | unwrap bar[3m]))`,
`sum(avg_over_time({app="foo"} | unwrap bar[3m]))`,
},
{ // this query caused a panic in ops
`topk(10,sum by (cluster,org_id) (rate({container="query-frontend",namespace="loki-prod",cluster="prod-us-central-0"} |= "metrics.go" | logfmt | unwrap bytes(total_bytes) | __error__=""[1h])))`,
`topk(10,sum by (cluster,org_id) (rate({container="query-frontend",namespace="loki-prod",cluster="prod-us-central-0"} |= "metrics.go" | logfmt | unwrap bytes(total_bytes) | __error__=""[1h])))`,
},

// should be noop if range interval is lower or equal to split interval (1m)
{
Expand Down

0 comments on commit 798677a

Please sign in to comment.