Skip to content

Commit

Permalink
[TSDB] Use index sampling to determine shard factor (#6396)
Browse files Browse the repository at this point in the history
* use index sampling to determine shard factor

* logging/tracing + better parallelism planning

* queryrange downstreamer now checks max_query_parallelism

* lint

* handles interval, offset in tsdb planning and adds logging

* fixes ns->ms confusion in index stats proto

* handle zero shard value without panics

* shardmapper will downstream a single unsharded query

* uses concat expr with no shards to avoid query parsing errors

* better logging

* fixes wrong Size() method call and rounds to nearest KB when calculating chunk size

* humanize bytes in log line

* only adds defaultLookback to index sampling when interval is zero

* removes comment

* more logging

* adjust through correctly

* adds query length for index queries
  • Loading branch information
owen-d authored Jun 16, 2022
1 parent 01fe534 commit d6f50ca
Show file tree
Hide file tree
Showing 16 changed files with 394 additions and 69 deletions.
14 changes: 8 additions & 6 deletions pkg/logql/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ operand expression can take advantage of the parallel execution model:
// querying the underlying backend shards individually and re-aggregating them.
type DownstreamEngine struct {
logger log.Logger
timeout time.Duration
opts EngineOpts
downstreamable Downstreamable
limits Limits
}
Expand All @@ -53,19 +53,21 @@ func NewDownstreamEngine(opts EngineOpts, downstreamable Downstreamable, limits
opts.applyDefault()
return &DownstreamEngine{
logger: logger,
timeout: opts.Timeout,
opts: opts,
downstreamable: downstreamable,
limits: limits,
}
}

func (ng *DownstreamEngine) Opts() EngineOpts { return ng.opts }

// Query constructs a Query
func (ng *DownstreamEngine) Query(p Params, mapped syntax.Expr) Query {
func (ng *DownstreamEngine) Query(ctx context.Context, p Params, mapped syntax.Expr) Query {
return &query{
logger: ng.logger,
timeout: ng.timeout,
timeout: ng.opts.Timeout,
params: p,
evaluator: NewDownstreamEvaluator(ng.downstreamable.Downstreamer()),
evaluator: NewDownstreamEvaluator(ng.downstreamable.Downstreamer(ctx)),
parse: func(_ context.Context, _ string) (syntax.Expr, error) {
return mapped, nil
},
Expand Down Expand Up @@ -158,7 +160,7 @@ func ParseShards(strs []string) (Shards, error) {
}

type Downstreamable interface {
Downstreamer() Downstreamer
Downstreamer(context.Context) Downstreamer
}

type DownstreamQuery struct {
Expand Down
7 changes: 3 additions & 4 deletions pkg/logql/downstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,11 @@ func TestMappingEquivalence(t *testing.T) {
qry := regular.Query(params)
ctx := user.InjectOrgID(context.Background(), "fake")

mapper, err := NewShardMapper(shards, nilShardMetrics)
require.Nil(t, err)
mapper := NewShardMapper(ConstantShards(shards), nilShardMetrics)
_, mapped, err := mapper.Parse(tc.query)
require.Nil(t, err)

shardedQry := sharded.Query(params, mapped)
shardedQry := sharded.Query(ctx, params, mapped)

res, err := qry.Exec(ctx)
require.Nil(t, err)
Expand Down Expand Up @@ -331,7 +330,7 @@ func TestRangeMappingEquivalence(t *testing.T) {

require.False(t, noop, "downstream engine cannot execute noop")

rangeQry := downstreamEngine.Query(params, rangeExpr)
rangeQry := downstreamEngine.Query(ctx, params, rangeExpr)
rangeRes, err := rangeQry.Exec(ctx)
require.Nil(t, err)

Expand Down
31 changes: 22 additions & 9 deletions pkg/logql/shardmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,20 @@ type ShardResolver interface {
Shards(expr syntax.Expr) (int, error)
}

type constantShards int
type ConstantShards int

func (s constantShards) Shards(_ syntax.Expr) (int, error) { return int(s), nil }
func (s ConstantShards) Shards(_ syntax.Expr) (int, error) { return int(s), nil }

type ShardMapper struct {
shards constantShards
shards ShardResolver
metrics *MapperMetrics
}

func NewShardMapper(shards int, metrics *MapperMetrics) (ShardMapper, error) {
if shards < 2 {
return ShardMapper{}, fmt.Errorf("cannot create ShardMapper with <2 shards. Received %d", shards)
}
func NewShardMapper(resolver ShardResolver, metrics *MapperMetrics) ShardMapper {
return ShardMapper{
shards: constantShards(shards),
shards: resolver,
metrics: metrics,
}, nil
}
}

func NewShardMapperMetrics(registerer prometheus.Registerer) *MapperMetrics {
Expand Down Expand Up @@ -116,6 +113,14 @@ func (m ShardMapper) mapLogSelectorExpr(expr syntax.LogSelectorExpr, r *downstre
if err != nil {
return nil, err
}
if shards == 0 {
return &ConcatLogSelectorExpr{
DownstreamLogSelectorExpr: DownstreamLogSelectorExpr{
shard: nil,
LogSelectorExpr: expr,
},
}, nil
}
for i := shards - 1; i >= 0; i-- {
head = &ConcatLogSelectorExpr{
DownstreamLogSelectorExpr: DownstreamLogSelectorExpr{
Expand All @@ -139,6 +144,14 @@ func (m ShardMapper) mapSampleExpr(expr syntax.SampleExpr, r *downstreamRecorder
if err != nil {
return nil, err
}
if shards == 0 {
return &ConcatSampleExpr{
DownstreamSampleExpr: DownstreamSampleExpr{
shard: nil,
SampleExpr: expr,
},
}, nil
}
for i := shards - 1; i >= 0; i-- {
head = &ConcatSampleExpr{
DownstreamSampleExpr: DownstreamSampleExpr{
Expand Down
9 changes: 3 additions & 6 deletions pkg/logql/shardmapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ func TestShardedStringer(t *testing.T) {
}

func TestMapSampleExpr(t *testing.T) {
m, err := NewShardMapper(2, nilShardMetrics)
require.Nil(t, err)
m := NewShardMapper(ConstantShards(2), nilShardMetrics)

for _, tc := range []struct {
in syntax.SampleExpr
Expand Down Expand Up @@ -114,8 +113,7 @@ func TestMapSampleExpr(t *testing.T) {
}

func TestMappingStrings(t *testing.T) {
m, err := NewShardMapper(2, nilShardMetrics)
require.Nil(t, err)
m := NewShardMapper(ConstantShards(2), nilShardMetrics)
for _, tc := range []struct {
in string
out string
Expand Down Expand Up @@ -279,8 +277,7 @@ func TestMappingStrings(t *testing.T) {
}

func TestMapping(t *testing.T) {
m, err := NewShardMapper(2, nilShardMetrics)
require.Nil(t, err)
m := NewShardMapper(ConstantShards(2), nilShardMetrics)

for _, tc := range []struct {
in string
Expand Down
33 changes: 24 additions & 9 deletions pkg/logql/syntax/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ type SampleExpr interface {
// Selector is the LogQL selector to apply when retrieving logs.
Selector() LogSelectorExpr
Extractor() (SampleExtractor, error)
MatcherGroups() [][]*labels.Matcher
MatcherGroups() []MatcherRange
Expr
}

Expand Down Expand Up @@ -754,10 +754,16 @@ func (e *RangeAggregationExpr) Selector() LogSelectorExpr {
return e.Left.Left
}

func (e *RangeAggregationExpr) MatcherGroups() [][]*labels.Matcher {
func (e *RangeAggregationExpr) MatcherGroups() []MatcherRange {
xs := e.Left.Left.Matchers()
if len(xs) > 0 {
return [][]*labels.Matcher{xs}
return []MatcherRange{
{
Matchers: xs,
Interval: e.Left.Interval,
Offset: e.Left.Offset,
},
}
}
return nil
}
Expand Down Expand Up @@ -880,7 +886,7 @@ func mustNewVectorAggregationExpr(left SampleExpr, operation string, gr *Groupin
}
}

func (e *VectorAggregationExpr) MatcherGroups() [][]*labels.Matcher {
func (e *VectorAggregationExpr) MatcherGroups() []MatcherRange {
return e.Left.MatcherGroups()
}

Expand Down Expand Up @@ -1005,7 +1011,7 @@ type BinOpExpr struct {
Opts *BinOpOptions
}

func (e *BinOpExpr) MatcherGroups() [][]*labels.Matcher {
func (e *BinOpExpr) MatcherGroups() []MatcherRange {
return append(e.SampleExpr.MatcherGroups(), e.RHS.MatcherGroups()...)
}

Expand Down Expand Up @@ -1391,7 +1397,7 @@ func (e *LiteralExpr) Shardable() bool { return true }
func (e *LiteralExpr) Walk(f WalkFn) { f(e) }
func (e *LiteralExpr) Pipeline() (log.Pipeline, error) { return log.NewNoopPipeline(), nil }
func (e *LiteralExpr) Matchers() []*labels.Matcher { return nil }
func (e *LiteralExpr) MatcherGroups() [][]*labels.Matcher { return nil }
func (e *LiteralExpr) MatcherGroups() []MatcherRange { return nil }
func (e *LiteralExpr) Extractor() (log.SampleExtractor, error) { return nil, nil }
func (e *LiteralExpr) Value() float64 { return e.Val }

Expand Down Expand Up @@ -1446,7 +1452,7 @@ func (e *LabelReplaceExpr) Selector() LogSelectorExpr {
return e.Left.Selector()
}

func (e *LabelReplaceExpr) MatcherGroups() [][]*labels.Matcher {
func (e *LabelReplaceExpr) MatcherGroups() []MatcherRange {
return e.Left.MatcherGroups()
}

Expand Down Expand Up @@ -1521,13 +1527,22 @@ var shardableOps = map[string]bool{
OpTypeMul: true,
}

func MatcherGroups(expr Expr) [][]*labels.Matcher {
type MatcherRange struct {
Matchers []*labels.Matcher
Interval, Offset time.Duration
}

func MatcherGroups(expr Expr) []MatcherRange {
switch e := expr.(type) {
case SampleExpr:
return e.MatcherGroups()
case LogSelectorExpr:
if xs := e.Matchers(); len(xs) > 0 {
return [][]*labels.Matcher{xs}
return []MatcherRange{
{
Matchers: xs,
},
}
}
return nil
default:
Expand Down
30 changes: 23 additions & 7 deletions pkg/logql/syntax/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package syntax
import (
"fmt"
"testing"
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
Expand Down Expand Up @@ -182,19 +183,34 @@ func Test_SampleExpr_String(t *testing.T) {
func TestMatcherGroups(t *testing.T) {
for i, tc := range []struct {
query string
exp [][]*labels.Matcher
exp []MatcherRange
}{
{
query: `{job="foo"}`,
exp: [][]*labels.Matcher{
{labels.MustNewMatcher(labels.MatchEqual, "job", "foo")},
exp: []MatcherRange{
{
Matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "job", "foo"),
},
},
},
},
{
query: `count_over_time({job="foo"}[5m]) / count_over_time({job="bar"}[5m])`,
exp: [][]*labels.Matcher{
{labels.MustNewMatcher(labels.MatchEqual, "job", "foo")},
{labels.MustNewMatcher(labels.MatchEqual, "job", "bar")},
query: `count_over_time({job="foo"}[5m]) / count_over_time({job="bar"}[5m] offset 10m)`,
exp: []MatcherRange{
{
Interval: 5 * time.Minute,
Matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "job", "foo"),
},
},
{
Interval: 5 * time.Minute,
Offset: 10 * time.Minute,
Matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "job", "bar"),
},
},
},
},
} {
Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ type MockDownstreamer struct {
*Engine
}

func (m MockDownstreamer) Downstreamer() Downstreamer { return m }
func (m MockDownstreamer) Downstreamer(_ context.Context) Downstreamer { return m }

func (m MockDownstreamer) Downstream(ctx context.Context, queries []DownstreamQuery) ([]logqlmodel.Result, error) {
results := make([]logqlmodel.Result, 0, len(queries))
Expand Down
17 changes: 15 additions & 2 deletions pkg/querier/queryrange/downstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

"github.com/go-kit/log/level"
"github.com/grafana/dskit/tenant"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
Expand All @@ -21,7 +22,8 @@ const (
)

type DownstreamHandler struct {
next queryrangebase.Handler
limits Limits
next queryrangebase.Handler
}

func ParamsToLokiRequest(params logql.Params, shards logql.Shards) queryrangebase.Request {
Expand Down Expand Up @@ -54,8 +56,19 @@ func ParamsToLokiRequest(params logql.Params, shards logql.Shards) queryrangebas
// from creating an unreasonably large number of goroutines, such as
// the case of a query like `a / a / a / a / a ..etc`, which could try
// to shard each leg, quickly dispatching an unreasonable number of goroutines.
func (h DownstreamHandler) Downstreamer() logql.Downstreamer {
// In the future, it's probably better to replace this with a channel based API
// so we don't have to do all this ugly edge case handling/accounting
func (h DownstreamHandler) Downstreamer(ctx context.Context) logql.Downstreamer {
p := DefaultDownstreamConcurrency

// We may increase parallelism above the default,
// ensure we don't end up bottlenecking here.
if user, err := tenant.TenantID(ctx); err == nil {
if x := h.limits.MaxQueryParallelism(user); x > 0 {
p = x
}
}

locks := make(chan struct{}, p)
for i := 0; i < p; i++ {
locks <- struct{}{}
Expand Down
Loading

0 comments on commit d6f50ca

Please sign in to comment.