Skip to content

Commit

Permalink
Add label grouping downstream optimization to rate and bytes_rate exp…
Browse files Browse the repository at this point in the history
…ressions
  • Loading branch information
ssncferreira committed Mar 18, 2022
1 parent 6449bf9 commit 797d2b1
Show file tree
Hide file tree
Showing 3 changed files with 293 additions and 22 deletions.
98 changes: 94 additions & 4 deletions pkg/logql/downstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`sum(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`sum(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`sum(rate({a=~".+"}[2s]))`, time.Second},
{`sum(bytes_rate({a=~".+"}[2s]))`, time.Second},

// sum by
{`sum by (a) (bytes_over_time({a=~".+"}[2s]))`, time.Second},
Expand All @@ -146,6 +148,8 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`sum by (a) (max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`sum by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`sum by (a) (rate({a=~".+"}[2s]))`, time.Second},
{`sum by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second},

// count
{`count(bytes_over_time({a=~".+"}[2s]))`, time.Second},
Expand All @@ -155,6 +159,8 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`count(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`count(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`count(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`count(rate({a=~".+"}[2s]))`, time.Second},
{`count(bytes_rate({a=~".+"}[2s]))`, time.Second},

// count by
{`count by (a) (bytes_over_time({a=~".+"}[2s]))`, time.Second},
Expand All @@ -164,6 +170,8 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`count by (a) (max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`count by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`count by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`count by (a) (rate({a=~".+"}[2s]))`, time.Second},
{`count by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second},

// max
{`max(bytes_over_time({a=~".+"}[2s]))`, time.Second},
Expand All @@ -173,6 +181,8 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`max(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`max(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`max(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`max(rate({a=~".+"}[2s]))`, time.Second},
{`max(bytes_rate({a=~".+"}[2s]))`, time.Second},

// max by
{`max by (a) (bytes_over_time({a=~".+"}[2s]))`, time.Second},
Expand All @@ -182,6 +192,8 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`max by (a) (max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`max by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`max by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`max by (a) (rate({a=~".+"}[2s]))`, time.Second},
{`max by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second},

// min
{`min(bytes_over_time({a=~".+"}[2s]))`, time.Second},
Expand All @@ -191,6 +203,8 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`min(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`min(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`min(rate({a=~".+"}[2s]))`, time.Second},
{`min(bytes_rate({a=~".+"}[2s]))`, time.Second},

// min by
{`min by (a) (bytes_over_time({a=~".+"}[2s]))`, time.Second},
Expand All @@ -200,9 +214,8 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`min by (a) (max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`min by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},

{`sum by (b) (rate({a=~".+"}[2s]))`, time.Second},
{`sum by (b) (bytes_rate({a=~".+"}[2s]))`, time.Second},
{`min by (a) (rate({a=~".+"}[2s]))`, time.Second},
{`min by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second},

// Binary operations
{`bytes_over_time({a=~".+"}[3s]) + count_over_time({a=~".+"}[5s])`, time.Second},
Expand All @@ -211,7 +224,8 @@ func TestRangeMappingEquivalence(t *testing.T) {
// Multi vector aggregator layer queries
{`sum(max(bytes_over_time({a=~".+"}[3s])))`, time.Second},

// Non-splittable vector aggregators - TODO: Fix
// Non-splittable vector aggregators
// TODO: Fix topk
//{`topk(2, count_over_time({a=~".+"}[2s]))`, time.Second},
{`avg(count_over_time({a=~".+"}[2s]))`, time.Second},

Expand Down Expand Up @@ -593,6 +607,82 @@ func TestRangeMappingEquivalenceMockMapper(t *testing.T) {
Operation: syntax.OpTypeMin,
},
},

{
`sum by (a) (rate({a=~".+"}[2s]))`,
&syntax.VectorAggregationExpr{
Left: &syntax.VectorAggregationExpr{
Left: &syntax.BinOpExpr{
SampleExpr: &syntax.VectorAggregationExpr{
Left: &ConcatSampleExpr{
DownstreamSampleExpr: DownstreamSampleExpr{
SampleExpr: &syntax.VectorAggregationExpr{
Left: &syntax.RangeAggregationExpr{
Left: &syntax.LogRange{
Left: &syntax.MatchersExpr{
Mts: []*labels.Matcher{labelMatcher},
},
Interval: time.Second,
Offset: time.Second,
Unwrap: nil,
},
Operation: syntax.OpRangeTypeCount,
Grouping: nil,
},
Grouping: &syntax.Grouping{
Groups: []string{"a"},
Without: false,
},
Params: 0,
Operation: syntax.OpTypeSum,
},
},
next: &ConcatSampleExpr{
DownstreamSampleExpr: DownstreamSampleExpr{
SampleExpr: &syntax.VectorAggregationExpr{
Left: &syntax.RangeAggregationExpr{
Left: &syntax.LogRange{
Left: &syntax.MatchersExpr{
Mts: []*labels.Matcher{labelMatcher},
},
Interval: time.Second,
Offset: 0,
Unwrap: nil,
},
Operation: syntax.OpRangeTypeCount,
Grouping: nil,
},
Grouping: &syntax.Grouping{
Groups: []string{"a"},
Without: false,
},
Params: 0,
Operation: syntax.OpTypeSum,
},
},
next: nil,
},
},
Grouping: &syntax.Grouping{
Without: true,
},
Operation: syntax.OpTypeSum,
},
RHS: &syntax.LiteralExpr{Val: (2 * time.Second).Seconds()},
Op: syntax.OpTypeDiv,
Opts: &syntax.BinOpOptions{},
},
Grouping: &syntax.Grouping{
Without: true,
},
},
Grouping: &syntax.Grouping{
Groups: []string{"a"},
Without: false,
},
Operation: syntax.OpTypeSum,
},
},
} {
q := NewMockQuerier(
shards,
Expand Down
27 changes: 17 additions & 10 deletions pkg/logql/rangemapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,18 +110,26 @@ func hasLabelExtractionStage(expr syntax.SampleExpr) bool {
// Example:
// rate({app="foo"}[2m])
// => (sum without (count_over_time({app="foo"}[1m]) ++ count_over_time({app="foo"}[1m]) offset 1m) / 120)
func (m RangeVectorMapper) sumOverFullRange(expr *syntax.RangeAggregationExpr, operation string, rangeInterval time.Duration) syntax.SampleExpr {
without := &syntax.Grouping{
Without: true,
}
downstreamExpr := &syntax.RangeAggregationExpr{
func (m RangeVectorMapper) sumOverFullRange(expr *syntax.RangeAggregationExpr, overrideDownstream *syntax.VectorAggregationExpr, operation string, rangeInterval time.Duration) syntax.SampleExpr {
var downstreamExpr syntax.SampleExpr = &syntax.RangeAggregationExpr{
Left: expr.Left,
Operation: operation,
}
// Optimization: in case overrideDownstream exists, the downstream expression can be optimized with the grouping
// and operation of the overrideDownstream expression in order to reduce the returned streams' label set.
if overrideDownstream != nil {
downstreamExpr = &syntax.VectorAggregationExpr{
Left: downstreamExpr,
Grouping: overrideDownstream.Grouping,
Operation: overrideDownstream.Operation,
}
}
return &syntax.BinOpExpr{
SampleExpr: &syntax.VectorAggregationExpr{
Left: m.mapConcatSampleExpr(downstreamExpr, rangeInterval),
Grouping: without,
Left: m.mapConcatSampleExpr(downstreamExpr, rangeInterval),
Grouping: &syntax.Grouping{
Without: true,
},
Operation: syntax.OpTypeSum,
},
RHS: &syntax.LiteralExpr{Val: rangeInterval.Seconds()},
Expand All @@ -143,7 +151,6 @@ func (m RangeVectorMapper) splitDownstreams(downstreams *ConcatSampleExpr, expr
concrete.Left.Offset += offset
}
}

})
downstreams = &ConcatSampleExpr{
DownstreamSampleExpr: DownstreamSampleExpr{
Expand Down Expand Up @@ -281,9 +288,9 @@ func (m RangeVectorMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregation
Operation: syntax.OpTypeMin,
}
case syntax.OpRangeTypeRate:
return m.sumOverFullRange(expr, syntax.OpRangeTypeCount, rangeInterval)
return m.sumOverFullRange(expr, overrideDownstream, syntax.OpRangeTypeCount, rangeInterval)
case syntax.OpRangeTypeBytesRate:
return m.sumOverFullRange(expr, syntax.OpRangeTypeBytes, rangeInterval)
return m.sumOverFullRange(expr, overrideDownstream, syntax.OpRangeTypeBytes, rangeInterval)
default:
// this should not be reachable. If an operation is splittable it should
// have an optimization listed
Expand Down
Loading

0 comments on commit 797d2b1

Please sign in to comment.