Skip to content

Commit

Permalink
Add optimization by moving the outer label grouping downstream
Browse files Browse the repository at this point in the history
  • Loading branch information
ssncferreira committed Mar 18, 2022
1 parent 488474c commit 6449bf9
Show file tree
Hide file tree
Showing 4 changed files with 480 additions and 85 deletions.
369 changes: 369 additions & 0 deletions pkg/logql/downstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import (
"time"

"github.com/go-kit/log"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
)

var nilMetrics = NewShardingMetrics(nil)
Expand Down Expand Up @@ -263,6 +265,373 @@ func TestRangeMappingEquivalence(t *testing.T) {
}
}

// TODO: Delete - only used to ease the mapper testing
// Test mocking the query resultant from range mapper
func TestRangeMappingEquivalenceMockMapper(t *testing.T) {
var (
shards = 3
nStreams = 60
rounds = 20
streams = randomStreams(nStreams, rounds+1, shards, []string{"a", "b", "c", "d"})
start = time.Unix(0, 0)
end = time.Unix(0, int64(time.Second*time.Duration(rounds)))
step = time.Second
interval = time.Duration(0)
limit = 100
)

labelMatcher, _ := labels.NewMatcher(labels.MatchRegexp, "a", ".+")

for _, tc := range []struct {
query string
expr syntax.Expr
}{
// Range vector aggregators
{
`bytes_over_time({a=~".+"}[2s])`,
&syntax.VectorAggregationExpr{
Left: &ConcatSampleExpr{
DownstreamSampleExpr: DownstreamSampleExpr{
SampleExpr: &syntax.RangeAggregationExpr{
Left: &syntax.LogRange{
Left: &syntax.MatchersExpr{
Mts: []*labels.Matcher{labelMatcher},
},
Interval: time.Second,
Offset: time.Second,
Unwrap: nil,
},
Operation: syntax.OpRangeTypeBytes,
Grouping: nil,
},
},
next: &ConcatSampleExpr{
DownstreamSampleExpr: DownstreamSampleExpr{
SampleExpr: &syntax.RangeAggregationExpr{
Left: &syntax.LogRange{
Left: &syntax.MatchersExpr{
Mts: []*labels.Matcher{labelMatcher},
},
Interval: time.Second,
Offset: 0,
Unwrap: nil,
},
Operation: syntax.OpRangeTypeBytes,
Grouping: nil,
},
},
next: nil,
},
},
Grouping: &syntax.Grouping{
Without: true,
},
Operation: syntax.OpTypeSum,
},
},

{
`sum by (a) (bytes_over_time({a=~".+"}[2s]))`,
&syntax.VectorAggregationExpr{
Left: &syntax.VectorAggregationExpr{
Left: &ConcatSampleExpr{
DownstreamSampleExpr: DownstreamSampleExpr{
SampleExpr: &syntax.VectorAggregationExpr{
Left: &syntax.RangeAggregationExpr{
Left: &syntax.LogRange{
Left: &syntax.MatchersExpr{
Mts: []*labels.Matcher{labelMatcher},
},
Interval: time.Second,
Offset: time.Second,
Unwrap: nil,
},
Operation: syntax.OpRangeTypeBytes,
Grouping: nil,
},
Grouping: &syntax.Grouping{
Groups: []string{"a"},
Without: false,
},
Params: 0,
Operation: syntax.OpTypeSum,
},
},
next: &ConcatSampleExpr{
DownstreamSampleExpr: DownstreamSampleExpr{
SampleExpr: &syntax.VectorAggregationExpr{
Left: &syntax.RangeAggregationExpr{
Left: &syntax.LogRange{
Left: &syntax.MatchersExpr{
Mts: []*labels.Matcher{labelMatcher},
},
Interval: time.Second,
Offset: 0,
Unwrap: nil,
},
Operation: syntax.OpRangeTypeBytes,
Grouping: nil,
},
Grouping: &syntax.Grouping{
Groups: []string{"a"},
Without: false,
},
Params: 0,
Operation: syntax.OpTypeSum,
},
},
next: nil,
},
},
Grouping: &syntax.Grouping{
Without: true,
},
Operation: syntax.OpTypeSum,
},
Grouping: &syntax.Grouping{
Groups: []string{"a"},
Without: false,
},
Operation: syntax.OpTypeSum,
},
},

//{
// `count by (a) (bytes_over_time({a=~".+"}[2s]))`,
// &syntax.VectorAggregationExpr{
// Left: &syntax.VectorAggregationExpr{
// Left: &ConcatSampleExpr{
// DownstreamSampleExpr: DownstreamSampleExpr{
// SampleExpr: &syntax.VectorAggregationExpr{
// Left: &syntax.RangeAggregationExpr{
// Left: &syntax.LogRange{
// Left: &syntax.MatchersExpr{
// Mts: []*labels.Matcher{labelMatcher},
// },
// Interval: time.Second,
// Offset: time.Second,
// Unwrap: nil,
// },
// Operation: syntax.OpRangeTypeBytes,
// Grouping: nil,
// },
// Grouping: &syntax.Grouping{
// Groups: []string{"a"},
// Without: false,
// },
// Params: 0,
// Operation: syntax.OpTypeCount,
// },
// },
// next: &ConcatSampleExpr{
// DownstreamSampleExpr: DownstreamSampleExpr{
// SampleExpr: &syntax.VectorAggregationExpr{
// Left: &syntax.RangeAggregationExpr{
// Left: &syntax.LogRange{
// Left: &syntax.MatchersExpr{
// Mts: []*labels.Matcher{labelMatcher},
// },
// Interval: time.Second,
// Offset: 0,
// Unwrap: nil,
// },
// Operation: syntax.OpRangeTypeBytes,
// Grouping: nil,
// },
// Grouping: &syntax.Grouping{
// Groups: []string{"a"},
// Without: false,
// },
// Params: 0,
// Operation: syntax.OpTypeCount,
// },
// },
// next: nil,
// },
// },
// Grouping: &syntax.Grouping{
// Without: true,
// },
// Operation: syntax.OpTypeSum,
// },
// Grouping: &syntax.Grouping{
// Groups: []string{"a"},
// Without: false,
// },
// Operation: syntax.OpTypeCount,
// },
//},

{
`max by (a) (bytes_over_time({a=~".+"}[2s]))`,
&syntax.VectorAggregationExpr{
Left: &syntax.VectorAggregationExpr{
Left: &ConcatSampleExpr{
DownstreamSampleExpr: DownstreamSampleExpr{
SampleExpr: &syntax.VectorAggregationExpr{
Left: &syntax.RangeAggregationExpr{
Left: &syntax.LogRange{
Left: &syntax.MatchersExpr{
Mts: []*labels.Matcher{labelMatcher},
},
Interval: time.Second,
Offset: time.Second,
Unwrap: nil,
},
Operation: syntax.OpRangeTypeBytes,
Grouping: nil,
},
Grouping: &syntax.Grouping{
Groups: []string{"a"},
Without: false,
},
Params: 0,
Operation: syntax.OpTypeMax,
},
},
next: &ConcatSampleExpr{
DownstreamSampleExpr: DownstreamSampleExpr{
SampleExpr: &syntax.VectorAggregationExpr{
Left: &syntax.RangeAggregationExpr{
Left: &syntax.LogRange{
Left: &syntax.MatchersExpr{
Mts: []*labels.Matcher{labelMatcher},
},
Interval: time.Second,
Offset: 0,
Unwrap: nil,
},
Operation: syntax.OpRangeTypeBytes,
Grouping: nil,
},
Grouping: &syntax.Grouping{
Groups: []string{"a"},
Without: false,
},
Params: 0,
Operation: syntax.OpTypeMax,
},
},
next: nil,
},
},
Grouping: &syntax.Grouping{
Without: true,
},
Operation: syntax.OpTypeSum,
},
Grouping: &syntax.Grouping{
Groups: []string{"a"},
Without: false,
},
Operation: syntax.OpTypeMax,
},
},

{
`min by (a) (bytes_over_time({a=~".+"}[2s]))`,
&syntax.VectorAggregationExpr{
Left: &syntax.VectorAggregationExpr{
Left: &ConcatSampleExpr{
DownstreamSampleExpr: DownstreamSampleExpr{
SampleExpr: &syntax.VectorAggregationExpr{
Left: &syntax.RangeAggregationExpr{
Left: &syntax.LogRange{
Left: &syntax.MatchersExpr{
Mts: []*labels.Matcher{labelMatcher},
},
Interval: time.Second,
Offset: time.Second,
Unwrap: nil,
},
Operation: syntax.OpRangeTypeBytes,
Grouping: nil,
},
Grouping: &syntax.Grouping{
Groups: []string{"a"},
Without: false,
},
Params: 0,
Operation: syntax.OpTypeMin,
},
},
next: &ConcatSampleExpr{
DownstreamSampleExpr: DownstreamSampleExpr{
SampleExpr: &syntax.VectorAggregationExpr{
Left: &syntax.RangeAggregationExpr{
Left: &syntax.LogRange{
Left: &syntax.MatchersExpr{
Mts: []*labels.Matcher{labelMatcher},
},
Interval: time.Second,
Offset: 0,
Unwrap: nil,
},
Operation: syntax.OpRangeTypeBytes,
Grouping: nil,
},
Grouping: &syntax.Grouping{
Groups: []string{"a"},
Without: false,
},
Params: 0,
Operation: syntax.OpTypeMin,
},
},
next: nil,
},
},
Grouping: &syntax.Grouping{
Without: true,
},
Operation: syntax.OpTypeSum,
},
Grouping: &syntax.Grouping{
Groups: []string{"a"},
Without: false,
},
Operation: syntax.OpTypeMin,
},
},
} {
q := NewMockQuerier(
shards,
streams,
)

opts := EngineOpts{}
regularEngine := NewEngine(opts, q, NoLimits, log.NewNopLogger())
downstreamEngine := NewDownstreamEngine(opts, MockDownstreamer{regularEngine}, nilMetrics, NoLimits, log.NewNopLogger())

t.Run(tc.query, func(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "fake")

params := NewLiteralParams(
tc.query,
start,
end,
step,
interval,
logproto.FORWARD,
uint32(limit),
nil,
)

// Regular engine
qry := regularEngine.Query(params)
res, err := qry.Exec(ctx)
require.Nil(t, err)

// Downstream engine - split by range
rangeQry := downstreamEngine.Query(params, tc.expr)
rangeRes, err := rangeQry.Exec(ctx)
require.Nil(t, err)

require.Equal(t, res.Data, rangeRes.Data)
})
}
}

// approximatelyEquals ensures two responses are approximately equal, up to 6 decimals precision per sample
func approximatelyEquals(t *testing.T, as, bs promql.Matrix) {
require.Equal(t, len(as), len(bs))
Expand Down
Loading

0 comments on commit 6449bf9

Please sign in to comment.