Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split by range of instant queries #5662

Merged
merged 33 commits into from
Apr 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
63e9ba6
Split by range on Instant queries POC v3
ssncferreira Mar 15, 2022
c7569c3
Handle uneven split by duration
ssncferreira Mar 16, 2022
473465c
Register SplitByRangeMiddleware in roundtripper
chaudum Mar 16, 2022
f364e1b
fixup! Register SplitByRangeMiddleware in roundtripper
chaudum Mar 16, 2022
a02aa87
fixup! fixup! Register SplitByRangeMiddleware in roundtripper
chaudum Mar 16, 2022
a34cb03
Remove rewrite if range aggr has label extraction stage
chaudum Mar 17, 2022
f78a470
Fix linting
ssncferreira Mar 17, 2022
9c96c37
Implement range splitting for rate() and bytes_rate()
chaudum Mar 17, 2022
60c5781
Fix linting
ssncferreira Mar 17, 2022
ce6663d
Calculate offset of downstream queries correctly
chaudum Mar 17, 2022
a649930
Fix linting
ssncferreira Mar 18, 2022
f2c6ac6
Add optimization by moving the outer label grouping downstream
ssncferreira Mar 16, 2022
98ac288
Add label grouping downstream optimization to rate and bytes_rate exp…
ssncferreira Mar 18, 2022
98f3dc4
Add changelog entry
chaudum Mar 29, 2022
6d0eaa2
Simplify types in rangemapper
chaudum Apr 1, 2022
749eb6a
fixup! Simplify types in rangemapper
chaudum Apr 1, 2022
b3f2b21
Check in Map function if query is splittable by range
chaudum Apr 1, 2022
0148380
Some code cleanups and variable renaming
chaudum Apr 1, 2022
aa6736f
Extract duplicate code in range aggr mapping into function
chaudum Apr 1, 2022
5037439
Add topk to supported splittable vector aggregations
chaudum Apr 1, 2022
c4d3e3f
Check if query is splittable by range before calling Map()
chaudum Apr 4, 2022
3362f5c
Add more function comments
chaudum Apr 5, 2022
53f16be
Rename RangeVectorMapper to RangeMapper
chaudum Apr 5, 2022
98cf3bc
Fix incorrect import due to rebase
chaudum Apr 5, 2022
f6d10db
Add equivalence test cases with `logfmt` pipeline stage
chaudum Apr 5, 2022
10f0208
Remove limitation of pushing down vector aggr only if grouping is pre…
chaudum Apr 5, 2022
cf4be23
Remove TestRangeMappingEquivalenceMockMapper test
chaudum Apr 6, 2022
9cb483c
fixup! Remove limitation of pushing down vector aggr only if grouping…
chaudum Apr 6, 2022
93c0268
fixup! fixup! Remove limitation of pushing down vector aggr only if g…
chaudum Apr 6, 2022
f607dfe
Fix linter errors
chaudum Apr 6, 2022
e031615
Better naming of variable
chaudum Apr 7, 2022
8267d00
Split SplitRangeVectorMapping test into two
chaudum Apr 7, 2022
4225c90
Merge branch 'main' into instant-query-splitting-v3-optimizations
owen-d Apr 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 201 additions & 1 deletion pkg/logql/downstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,207 @@ func TestMappingEquivalence(t *testing.T) {
}
}

// approximatelyEquals ensures two responses are approximately equal, up to 6 decimals precision per sample
func TestRangeMappingEquivalence(t *testing.T) {
var (
shards = 3
nStreams = 60
rounds = 20
streams = randomStreams(nStreams, rounds+1, shards, []string{"a", "b", "c", "d"})
start = time.Unix(0, 0)
end = time.Unix(0, int64(time.Second*time.Duration(rounds)))
step = time.Second
interval = time.Duration(0)
limit = 100
)

for _, tc := range []struct {
query string
splitByInterval time.Duration
}{
// Range vector aggregators
{`bytes_over_time({a=~".+"}[2s])`, time.Second},
{`count_over_time({a=~".+"}[2s])`, time.Second},
{`sum_over_time({a=~".+"} | unwrap b [2s])`, time.Second},
{`max_over_time({a=~".+"} | unwrap b [2s])`, time.Second},
{`max_over_time({a=~".+"} | unwrap b [2s]) by (a)`, time.Second},
{`max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a)`, time.Second},
{`min_over_time({a=~".+"} | unwrap b [2s])`, time.Second},
{`min_over_time({a=~".+"} | unwrap b [2s]) by (a)`, time.Second},
{`min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a)`, time.Second},
{`rate({a=~".+"}[2s])`, time.Second},
{`bytes_rate({a=~".+"}[2s])`, time.Second},

// sum
{`sum(bytes_over_time({a=~".+"}[2s]))`, time.Second},
{`sum(count_over_time({a=~".+"}[2s]))`, time.Second},
{`sum(sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum(max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`sum(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`sum(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`sum(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`sum(rate({a=~".+"}[2s]))`, time.Second},
{`sum(bytes_rate({a=~".+"}[2s]))`, time.Second},

// sum by
{`sum by (a) (bytes_over_time({a=~".+"}[2s]))`, time.Second},
{`sum by (a) (count_over_time({a=~".+"}[2s]))`, time.Second},
{`sum by (a) (sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum by (a) (max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum by (a) (max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`sum by (a) (max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`sum by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`sum by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`sum by (a) (rate({a=~".+"}[2s]))`, time.Second},
{`sum by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second},

// count
{`count(bytes_over_time({a=~".+"}[2s]))`, time.Second},
{`count(count_over_time({a=~".+"}[2s]))`, time.Second},
{`count(sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`count(max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`count(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`count(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`count(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`count(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`count(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`count(rate({a=~".+"}[2s]))`, time.Second},
{`count(bytes_rate({a=~".+"}[2s]))`, time.Second},

// count by
{`count by (a) (bytes_over_time({a=~".+"}[2s]))`, time.Second},
{`count by (a) (count_over_time({a=~".+"}[2s]))`, time.Second},
{`count by (a) (sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`count by (a) (max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`count by (a) (max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`count by (a) (max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`count by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`count by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`count by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`count by (a) (rate({a=~".+"}[2s]))`, time.Second},
{`count by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second},

// max
{`max(bytes_over_time({a=~".+"}[2s]))`, time.Second},
{`max(count_over_time({a=~".+"}[2s]))`, time.Second},
{`max(sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`max(max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`max(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`max(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`max(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`max(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`max(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`max(rate({a=~".+"}[2s]))`, time.Second},
{`max(bytes_rate({a=~".+"}[2s]))`, time.Second},

// max by
{`max by (a) (bytes_over_time({a=~".+"}[2s]))`, time.Second},
{`max by (a) (count_over_time({a=~".+"}[2s]))`, time.Second},
{`max by (a) (sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`max by (a) (max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`max by (a) (max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`max by (a) (max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`max by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`max by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`max by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`max by (a) (rate({a=~".+"}[2s]))`, time.Second},
{`max by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second},

// min
{`min(bytes_over_time({a=~".+"}[2s]))`, time.Second},
{`min(count_over_time({a=~".+"}[2s]))`, time.Second},
{`min(sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min(max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`min(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`min(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`min(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`min(rate({a=~".+"}[2s]))`, time.Second},
{`min(bytes_rate({a=~".+"}[2s]))`, time.Second},

// min by
{`min by (a) (bytes_over_time({a=~".+"}[2s]))`, time.Second},
{`min by (a) (count_over_time({a=~".+"}[2s]))`, time.Second},
{`min by (a) (sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min by (a) (max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min by (a) (max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`min by (a) (max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`min by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`min by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`min by (a) (rate({a=~".+"}[2s]))`, time.Second},
{`min by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second},

// Binary operations
{`bytes_over_time({a=~".+"}[3s]) + count_over_time({a=~".+"}[5s])`, time.Second},
{`sum(count_over_time({a=~".+"}[3s]) * count(sum_over_time({a=~".+"} | unwrap b [5s])))`, time.Second},

// Multi vector aggregator layer queries
{`sum(max(bytes_over_time({a=~".+"}[3s])))`, time.Second},
{`sum(min by (a)(max(sum by (b) (count_over_time({a=~".+"} [2s])))))`, time.Second},

// Non-splittable vector aggregators
// TODO: Fix topk
//{`topk(2, count_over_time({a=~".+"}[2s]))`, time.Second},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see some successful topk.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do it in a 2nd iteration :)

{`avg(count_over_time({a=~".+"}[2s]))`, time.Second},

// Uneven split times
{`bytes_over_time({a=~".+"}[3s])`, 2 * time.Second},
{`count_over_time({a=~".+"}[5s])`, 2 * time.Second},

// range with offset
{`rate({a=~".+"}[2s] offset 2s)`, time.Second},
} {
q := NewMockQuerier(
shards,
streams,
)

opts := EngineOpts{}
regularEngine := NewEngine(opts, q, NoLimits, log.NewNopLogger())
downstreamEngine := NewDownstreamEngine(opts, MockDownstreamer{regularEngine}, nilMetrics, NoLimits, log.NewNopLogger())

t.Run(tc.query, func(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "fake")

params := NewLiteralParams(
tc.query,
start,
end,
step,
interval,
logproto.FORWARD,
uint32(limit),
nil,
)

// Regular engine
qry := regularEngine.Query(params)
res, err := qry.Exec(ctx)
require.Nil(t, err)

// Downstream engine - split by range
rangeMapper, err := NewRangeMapper(tc.splitByInterval)
require.Nil(t, err)
noop, rangeExpr, err := rangeMapper.Parse(tc.query)
require.Nil(t, err)

require.False(t, noop, "downstream engine cannot execute noop")

rangeQry := downstreamEngine.Query(params, rangeExpr)
rangeRes, err := rangeQry.Exec(ctx)
require.Nil(t, err)

require.Equal(t, res.Data, rangeRes.Data)
})
}
}

// approximatelyEquals ensures two responses are approximately equal,
// up to 6 decimals precision per sample
func approximatelyEquals(t *testing.T, as, bs promql.Matrix) {
require.Equal(t, len(as), len(bs))

Expand Down
Loading