Skip to content

Commit

Permalink
Split by range of instant queries (#5662)
Browse files Browse the repository at this point in the history
* Split by range on Instant queries POC v3

Co-authored-by: Christian Haudum <christian.haudum@gmail.com>

* Handle uneven split by duration

* Register SplitByRangeMiddleware in roundtripper

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* fixup! Register SplitByRangeMiddleware in roundtripper

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* fixup! fixup! Register SplitByRangeMiddleware in roundtripper

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Remove rewrite if range aggr has label extraction stage

In case a range aggregation has a generic label extraction stage, such
as `| json` or `| logfmt` and no group by, we cannot split it, because
otherwise the downstream queries would result in too many series.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Fix linting

* Implement range splitting for rate() and bytes_rate()

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Fix linting

* Calculate offset of downstream queries correctly

if the outer query range contains an offset as well.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Fix linting

* Add optimization by moving the outer label grouping downstream

* Add label grouping downstream optimization to rate and bytes_rate expressions

* Add changelog entry

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Simplify types in rangemapper

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* fixup! Simplify types in rangemapper

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Check in Map function if query is splittable by range

Since this is the main function of the mapper, we can ensure here that
only supported vector/range aggregations are handled.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Some code cleanups and variable renaming

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Extract duplicate code in range aggr mapping into function

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Add topk to supported splittable vector aggregations

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Check if query is splittable by range before calling Map()

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Add more function comments

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Rename RangeVectorMapper to RangeMapper

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Fix incorrect import due to rebase

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Add equivalence test cases with `logfmt` pipeline stage

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Remove limitation of pushing down vector aggr only if grouping is present

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Remove TestRangeMappingEquivalenceMockMapper test

This test is essentially the same as the test
Test_SplitRangeVectorMapping, just using a different representation of
the result.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* fixup! Remove limitation of pushing down vector aggr only if grouping is present

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* fixup! fixup! Remove limitation of pushing down vector aggr only if grouping is present

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Fix linter errors

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Better naming of variable

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Split SplitRangeVectorMapping test into two

to have the test for noop queries separated

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

Co-authored-by: Christian Haudum <christian.haudum@gmail.com>
Co-authored-by: Owen Diehl <ow.diehl@gmail.com>
  • Loading branch information
3 people authored Apr 7, 2022
1 parent cc3a8e4 commit 0bce8d9
Show file tree
Hide file tree
Showing 7 changed files with 1,893 additions and 2 deletions.
202 changes: 201 additions & 1 deletion pkg/logql/downstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,207 @@ func TestMappingEquivalence(t *testing.T) {
}
}

// approximatelyEquals ensures two responses are approximately equal, up to 6 decimals precision per sample
func TestRangeMappingEquivalence(t *testing.T) {
var (
shards = 3
nStreams = 60
rounds = 20
streams = randomStreams(nStreams, rounds+1, shards, []string{"a", "b", "c", "d"})
start = time.Unix(0, 0)
end = time.Unix(0, int64(time.Second*time.Duration(rounds)))
step = time.Second
interval = time.Duration(0)
limit = 100
)

for _, tc := range []struct {
query string
splitByInterval time.Duration
}{
// Range vector aggregators
{`bytes_over_time({a=~".+"}[2s])`, time.Second},
{`count_over_time({a=~".+"}[2s])`, time.Second},
{`sum_over_time({a=~".+"} | unwrap b [2s])`, time.Second},
{`max_over_time({a=~".+"} | unwrap b [2s])`, time.Second},
{`max_over_time({a=~".+"} | unwrap b [2s]) by (a)`, time.Second},
{`max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a)`, time.Second},
{`min_over_time({a=~".+"} | unwrap b [2s])`, time.Second},
{`min_over_time({a=~".+"} | unwrap b [2s]) by (a)`, time.Second},
{`min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a)`, time.Second},
{`rate({a=~".+"}[2s])`, time.Second},
{`bytes_rate({a=~".+"}[2s])`, time.Second},

// sum
{`sum(bytes_over_time({a=~".+"}[2s]))`, time.Second},
{`sum(count_over_time({a=~".+"}[2s]))`, time.Second},
{`sum(sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum(max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`sum(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`sum(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`sum(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`sum(rate({a=~".+"}[2s]))`, time.Second},
{`sum(bytes_rate({a=~".+"}[2s]))`, time.Second},

// sum by
{`sum by (a) (bytes_over_time({a=~".+"}[2s]))`, time.Second},
{`sum by (a) (count_over_time({a=~".+"}[2s]))`, time.Second},
{`sum by (a) (sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum by (a) (max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum by (a) (max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`sum by (a) (max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`sum by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`sum by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`sum by (a) (rate({a=~".+"}[2s]))`, time.Second},
{`sum by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second},

// count
{`count(bytes_over_time({a=~".+"}[2s]))`, time.Second},
{`count(count_over_time({a=~".+"}[2s]))`, time.Second},
{`count(sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`count(max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`count(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`count(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`count(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`count(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`count(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`count(rate({a=~".+"}[2s]))`, time.Second},
{`count(bytes_rate({a=~".+"}[2s]))`, time.Second},

// count by
{`count by (a) (bytes_over_time({a=~".+"}[2s]))`, time.Second},
{`count by (a) (count_over_time({a=~".+"}[2s]))`, time.Second},
{`count by (a) (sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`count by (a) (max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`count by (a) (max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`count by (a) (max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`count by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`count by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`count by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`count by (a) (rate({a=~".+"}[2s]))`, time.Second},
{`count by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second},

// max
{`max(bytes_over_time({a=~".+"}[2s]))`, time.Second},
{`max(count_over_time({a=~".+"}[2s]))`, time.Second},
{`max(sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`max(max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`max(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`max(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`max(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`max(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`max(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`max(rate({a=~".+"}[2s]))`, time.Second},
{`max(bytes_rate({a=~".+"}[2s]))`, time.Second},

// max by
{`max by (a) (bytes_over_time({a=~".+"}[2s]))`, time.Second},
{`max by (a) (count_over_time({a=~".+"}[2s]))`, time.Second},
{`max by (a) (sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`max by (a) (max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`max by (a) (max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`max by (a) (max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`max by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`max by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`max by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`max by (a) (rate({a=~".+"}[2s]))`, time.Second},
{`max by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second},

// min
{`min(bytes_over_time({a=~".+"}[2s]))`, time.Second},
{`min(count_over_time({a=~".+"}[2s]))`, time.Second},
{`min(sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min(max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`min(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`min(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`min(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`min(rate({a=~".+"}[2s]))`, time.Second},
{`min(bytes_rate({a=~".+"}[2s]))`, time.Second},

// min by
{`min by (a) (bytes_over_time({a=~".+"}[2s]))`, time.Second},
{`min by (a) (count_over_time({a=~".+"}[2s]))`, time.Second},
{`min by (a) (sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min by (a) (max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min by (a) (max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`min by (a) (max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`min by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`min by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`min by (a) (rate({a=~".+"}[2s]))`, time.Second},
{`min by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second},

// Binary operations
{`bytes_over_time({a=~".+"}[3s]) + count_over_time({a=~".+"}[5s])`, time.Second},
{`sum(count_over_time({a=~".+"}[3s]) * count(sum_over_time({a=~".+"} | unwrap b [5s])))`, time.Second},

// Multi vector aggregator layer queries
{`sum(max(bytes_over_time({a=~".+"}[3s])))`, time.Second},
{`sum(min by (a)(max(sum by (b) (count_over_time({a=~".+"} [2s])))))`, time.Second},

// Non-splittable vector aggregators
// TODO: Fix topk
//{`topk(2, count_over_time({a=~".+"}[2s]))`, time.Second},
{`avg(count_over_time({a=~".+"}[2s]))`, time.Second},

// Uneven split times
{`bytes_over_time({a=~".+"}[3s])`, 2 * time.Second},
{`count_over_time({a=~".+"}[5s])`, 2 * time.Second},

// range with offset
{`rate({a=~".+"}[2s] offset 2s)`, time.Second},
} {
q := NewMockQuerier(
shards,
streams,
)

opts := EngineOpts{}
regularEngine := NewEngine(opts, q, NoLimits, log.NewNopLogger())
downstreamEngine := NewDownstreamEngine(opts, MockDownstreamer{regularEngine}, nilMetrics, NoLimits, log.NewNopLogger())

t.Run(tc.query, func(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "fake")

params := NewLiteralParams(
tc.query,
start,
end,
step,
interval,
logproto.FORWARD,
uint32(limit),
nil,
)

// Regular engine
qry := regularEngine.Query(params)
res, err := qry.Exec(ctx)
require.Nil(t, err)

// Downstream engine - split by range
rangeMapper, err := NewRangeMapper(tc.splitByInterval)
require.Nil(t, err)
noop, rangeExpr, err := rangeMapper.Parse(tc.query)
require.Nil(t, err)

require.False(t, noop, "downstream engine cannot execute noop")

rangeQry := downstreamEngine.Query(params, rangeExpr)
rangeRes, err := rangeQry.Exec(ctx)
require.Nil(t, err)

require.Equal(t, res.Data, rangeRes.Data)
})
}
}

// approximatelyEquals ensures two responses are approximately equal,
// up to 6 decimals precision per sample
func approximatelyEquals(t *testing.T, as, bs promql.Matrix) {
require.Equal(t, len(as), len(bs))

Expand Down
Loading

0 comments on commit 0bce8d9

Please sign in to comment.