From e4fe8a1df76b41b5c5eaf0ef2272e43b031528b0 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 5 Feb 2020 20:59:43 -0500 Subject: [PATCH 01/21] [wip] sharding evaluator/ast --- pkg/logql/astmapper.go | 11 +++ pkg/logql/properties.go | 40 ++++++++++ pkg/logql/sharding.go | 170 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 221 insertions(+) create mode 100644 pkg/logql/astmapper.go create mode 100644 pkg/logql/properties.go create mode 100644 pkg/logql/sharding.go diff --git a/pkg/logql/astmapper.go b/pkg/logql/astmapper.go new file mode 100644 index 0000000000000..4e1a91324fd89 --- /dev/null +++ b/pkg/logql/astmapper.go @@ -0,0 +1,11 @@ +package logql + +// ASTMapper is the exported interface for mapping between multiple AST representations +type ASTMapper interface { + Map(Expr) (Expr, error) +} + +// CloneExpr is a helper function to clone a node. +func CloneExpr(expr Expr) (Expr, error) { + return ParseExpr(expr.String()) +} diff --git a/pkg/logql/properties.go b/pkg/logql/properties.go new file mode 100644 index 0000000000000..39977f922fbca --- /dev/null +++ b/pkg/logql/properties.go @@ -0,0 +1,40 @@ +package logql + +// technically, std{dev,var} are also parallelizable if there is no cross-shard merging +// in descendent nodes in the AST. This optimization is currently avoided for simplicity. +var parallelOperations = map[string]bool{ + OpTypeSum: true, + OpTypeAvg: true, + OpTypeMax: true, + OpTypeMin: true, + OpTypeCount: true, + OpTypeBottomK: true, + OpTypeTopK: true, + OpTypeCountOverTime: true, + OpTypeRate: true, +} + +// PropertyExpr is an expression which can determine certain properties of an expression +// and also impls ASTMapper +type PropertyExpr interface { + Expr + CanParallel() bool // Whether this expression can be parallelized + +} + +type propertyExpr struct { + Expr +} + +func (e propertyExpr) CanParallel() bool { + switch expr := e.Expr.(type) { + case *matchersExpr, *filterExpr: + return true + case *rangeAggregationExpr: + return parallelOperations[expr.operation] + case *vectorAggregationExpr: + return parallelOperations[expr.operation] && propertyExpr{expr.left}.CanParallel() + default: + return false + } +} diff --git a/pkg/logql/sharding.go b/pkg/logql/sharding.go new file mode 100644 index 0000000000000..76e3393ba8de0 --- /dev/null +++ b/pkg/logql/sharding.go @@ -0,0 +1,170 @@ +package logql + +import ( + "context" + "errors" + + "github.com/grafana/loki/pkg/iter" + "github.com/prometheus/prometheus/pkg/labels" +) + +// type shardedLogSelectorExpr struct { +// LogSelectorExpr +// shard int +// } + +// type shardedSampleExpr struct { +// SampleExpr +// shard int +// } + +// type ShardMapper struct { +// shards int +// } + +// func (m ShardMapper) Map(expr Expr) (Expr, error) { +// cloned, err := CloneExpr(expr) +// if err != nil { +// return nil, err +// } + +// if (propertyExpr{cloned}).CanParallel() { +// return m.parallelize(cloned) +// } + +// switch e := cloned.(type) { +// case *rangeAggregationExpr: +// mapped, err := m.Map(e.left.left) +// if err != nil { +// return nil, err +// } +// e.left.left = mapped.(LogSelectorExpr) +// return e, nil +// case *vectorAggregationExpr: +// mapped, err := m.Map(e.left) +// if err != nil { +// return nil, err +// } +// e.left = mapped.(SampleExpr) +// return e, nil +// default: +// return nil, errors.Errorf("unexpected expr marked as not parallelizable: %+v", expr) +// } +// } + +// func (m ShardMapper) parallelize(expr Expr) (Expr, error) { +// switch e := expr.(type) { +// case *matchersExpr: +// case *filterExpr: +// case *rangeAggregationExpr: +// case *vectorAggregationExpr: +// default: +// return nil, errors.Errorf("unexpected expr: %+v", expr) +// } +// } + +// DownstreamExpr impls both LogSelectorExpr and SampleExpr in order to transparently +// wrap an expr and signal that it should be executed on a downstream querier. +type DownstreamExpr struct { + shard *int + Expr +} + +func (e DownstreamExpr) Selector() LogSelectorExpr { + return e.Expr.(SampleExpr).Selector() +} + +func (e DownstreamExpr) Filter() (Filter, error) { + return e.Expr.(LogSelectorExpr).Filter() +} + +func (e DownstreamExpr) Matchers() []*labels.Matcher { + return e.Expr.(LogSelectorExpr).Matchers() +} + +// ConcatSampleExpr is a sample expr which is used to signal a list of +// SampleExprs which should be joined +type ConcatSampleExpr struct { + SampleExpr + next *ConcatSampleExpr +} + +// ConcatLogSelectorExpr is a sample expr which is used to signal a list of +// LogSelectorExprs which should be joined +type ConcatLogSelectorExpr struct { + LogSelectorExpr + next *ConcatLogSelectorExpr +} + +// shardedEvaluator is an evaluator which handles shard aware AST nodes +// and embeds a default evaluator otherwise +type shardedEvaluator struct { + shards int + evaluator *defaultEvaluator +} + +// Evaluator returns a StepEvaluator for a given SampleExpr +func (ev *shardedEvaluator) Evaluator( + ctx context.Context, + expr SampleExpr, + params Params, +) (StepEvaluator, error) { + switch e := expr.(type) { + case DownstreamExpr: + // TODO(owen): downstream this and present as StepEvaluator + return nil, errors.New("unimplemented") + case ConcatSampleExpr: + var evaluators []StepEvaluator + for { + eval, err := ev.Evaluator(ctx, e.SampleExpr, params) + if err != nil { + return nil, err + } + evaluators = append(evaluators, eval) + if e.next != nil { + break + } + e = *e.next + } + return ConcatEvaluator(evaluators) + default: + return ev.evaluator.Evaluator(ctx, expr, params) + } +} + +// Iterator returns the iter.EntryIterator for a given LogSelectorExpr +func (ev *shardedEvaluator) Iterator( + ctx context.Context, + expr LogSelectorExpr, + params Params, +) (iter.EntryIterator, error) { + switch e := expr.(type) { + case DownstreamExpr: + // TODO(owen): downstream this and present as iter.EntryIterator + return nil, errors.New("unimplemented") + case ConcatLogSelectorExpr: + var iters []iter.EntryIterator + + for { + iter, err := ev.Iterator(ctx, e.LogSelectorExpr, params) + // TODO(owen): close these iters? + if err != nil { + return nil, err + } + iters = append(iters, iter) + if e.next == nil { + break + } + e = *e.next + } + return iter.NewHeapIterator(ctx, iters, params.Direction()) + default: + return nil, errors.Errorf("unexpected type (%T): %v", e, e) + } +} + +/* +map :: AST -> AST + + +*/ From 1471d0a2aaf3e676674d03395d4b5d808fad481b Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 6 Feb 2020 10:55:15 -0500 Subject: [PATCH 02/21] [wip] continues experimenting with ast mapping --- pkg/logql/evaluator.go | 26 +++++++++++++++ pkg/logql/properties.go | 18 ++--------- pkg/logql/sharding.go | 70 ++++++++++++++++++----------------------- 3 files changed, 59 insertions(+), 55 deletions(-) diff --git a/pkg/logql/evaluator.go b/pkg/logql/evaluator.go index 1846bad12168f..f5cb2fce5fe69 100644 --- a/pkg/logql/evaluator.go +++ b/pkg/logql/evaluator.go @@ -592,3 +592,29 @@ func (ev *defaultEvaluator) literalEvaluator( eval.Close, ) } + +// ConcatEvaluator joins multiple StepEvaluators. +// Contract: They must be of identical start, end, and step values. +func ConcatEvaluator(evaluators []StepEvaluator) StepEvaluator { + return newStepEvaluator( + func() (done bool, ts int64, vec promql.Vector) { + var cur promql.Vector + for { + for _, eval := range evaluators { + done, ts, cur = eval.Next() + vec = append(vec, cur...) + } + } + return done, ts, vec + + }, + func() (lastErr error) { + for _, eval := range evaluators { + if err := eval.Close(); err != nil { + lastErr = err + } + } + return lastErr + }, + ) +} diff --git a/pkg/logql/properties.go b/pkg/logql/properties.go index 39977f922fbca..421779e398eb2 100644 --- a/pkg/logql/properties.go +++ b/pkg/logql/properties.go @@ -14,26 +14,14 @@ var parallelOperations = map[string]bool{ OpTypeRate: true, } -// PropertyExpr is an expression which can determine certain properties of an expression -// and also impls ASTMapper -type PropertyExpr interface { - Expr - CanParallel() bool // Whether this expression can be parallelized - -} - -type propertyExpr struct { - Expr -} - -func (e propertyExpr) CanParallel() bool { - switch expr := e.Expr.(type) { +func CanParallel(e Expr) bool { + switch expr := e.(type) { case *matchersExpr, *filterExpr: return true case *rangeAggregationExpr: return parallelOperations[expr.operation] case *vectorAggregationExpr: - return parallelOperations[expr.operation] && propertyExpr{expr.left}.CanParallel() + return parallelOperations[expr.operation] && CanParallel(expr.Left) default: return false } diff --git a/pkg/logql/sharding.go b/pkg/logql/sharding.go index 76e3393ba8de0..d92e7154e9267 100644 --- a/pkg/logql/sharding.go +++ b/pkg/logql/sharding.go @@ -8,49 +8,39 @@ import ( "github.com/prometheus/prometheus/pkg/labels" ) -// type shardedLogSelectorExpr struct { -// LogSelectorExpr -// shard int -// } - -// type shardedSampleExpr struct { -// SampleExpr -// shard int -// } - -// type ShardMapper struct { -// shards int -// } +type ShardMapper struct { + shards int +} -// func (m ShardMapper) Map(expr Expr) (Expr, error) { -// cloned, err := CloneExpr(expr) -// if err != nil { -// return nil, err -// } +func (m ShardMapper) Map(expr Expr) (Expr, error) { + cloned, err := CloneExpr(expr) + if err != nil { + return nil, err + } -// if (propertyExpr{cloned}).CanParallel() { -// return m.parallelize(cloned) -// } + if CanParallel(cloned) { + return m.parallelize(cloned) + } -// switch e := cloned.(type) { -// case *rangeAggregationExpr: -// mapped, err := m.Map(e.left.left) -// if err != nil { -// return nil, err -// } -// e.left.left = mapped.(LogSelectorExpr) -// return e, nil -// case *vectorAggregationExpr: -// mapped, err := m.Map(e.left) -// if err != nil { -// return nil, err -// } -// e.left = mapped.(SampleExpr) -// return e, nil -// default: -// return nil, errors.Errorf("unexpected expr marked as not parallelizable: %+v", expr) -// } -// } + switch e := cloned.(type) { + case *rangeAggregationExpr: + mapped, err := m.Map(e.left.left) + if err != nil { + return nil, err + } + e.left.left = mapped.(LogSelectorExpr) + return e, nil + case *vectorAggregationExpr: + mapped, err := m.Map(e.left) + if err != nil { + return nil, err + } + e.left = mapped.(SampleExpr) + return e, nil + default: + return nil, errors.Errorf("unexpected expr marked as not parallelizable: %+v", expr) + } +} // func (m ShardMapper) parallelize(expr Expr) (Expr, error) { // switch e := expr.(type) { From 8260bc389d5082473523cb99fbedfebf2bdf4d43 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Fri, 14 Feb 2020 08:08:43 -0500 Subject: [PATCH 03/21] refactoring in preparation for binops --- pkg/logql/properties.go | 2 +- pkg/logql/sharding.go | 160 -------------------------------- pkg/logql/sharding_evaluator.go | 71 ++++++++++++++ pkg/logql/sharding_exprs.go | 25 +++++ pkg/logql/sharding_mapper.go | 5 + 5 files changed, 102 insertions(+), 161 deletions(-) delete mode 100644 pkg/logql/sharding.go create mode 100644 pkg/logql/sharding_evaluator.go create mode 100644 pkg/logql/sharding_exprs.go create mode 100644 pkg/logql/sharding_mapper.go diff --git a/pkg/logql/properties.go b/pkg/logql/properties.go index 421779e398eb2..96cd220409242 100644 --- a/pkg/logql/properties.go +++ b/pkg/logql/properties.go @@ -21,7 +21,7 @@ func CanParallel(e Expr) bool { case *rangeAggregationExpr: return parallelOperations[expr.operation] case *vectorAggregationExpr: - return parallelOperations[expr.operation] && CanParallel(expr.Left) + return parallelOperations[expr.operation] && CanParallel(expr.left) default: return false } diff --git a/pkg/logql/sharding.go b/pkg/logql/sharding.go deleted file mode 100644 index d92e7154e9267..0000000000000 --- a/pkg/logql/sharding.go +++ /dev/null @@ -1,160 +0,0 @@ -package logql - -import ( - "context" - "errors" - - "github.com/grafana/loki/pkg/iter" - "github.com/prometheus/prometheus/pkg/labels" -) - -type ShardMapper struct { - shards int -} - -func (m ShardMapper) Map(expr Expr) (Expr, error) { - cloned, err := CloneExpr(expr) - if err != nil { - return nil, err - } - - if CanParallel(cloned) { - return m.parallelize(cloned) - } - - switch e := cloned.(type) { - case *rangeAggregationExpr: - mapped, err := m.Map(e.left.left) - if err != nil { - return nil, err - } - e.left.left = mapped.(LogSelectorExpr) - return e, nil - case *vectorAggregationExpr: - mapped, err := m.Map(e.left) - if err != nil { - return nil, err - } - e.left = mapped.(SampleExpr) - return e, nil - default: - return nil, errors.Errorf("unexpected expr marked as not parallelizable: %+v", expr) - } -} - -// func (m ShardMapper) parallelize(expr Expr) (Expr, error) { -// switch e := expr.(type) { -// case *matchersExpr: -// case *filterExpr: -// case *rangeAggregationExpr: -// case *vectorAggregationExpr: -// default: -// return nil, errors.Errorf("unexpected expr: %+v", expr) -// } -// } - -// DownstreamExpr impls both LogSelectorExpr and SampleExpr in order to transparently -// wrap an expr and signal that it should be executed on a downstream querier. -type DownstreamExpr struct { - shard *int - Expr -} - -func (e DownstreamExpr) Selector() LogSelectorExpr { - return e.Expr.(SampleExpr).Selector() -} - -func (e DownstreamExpr) Filter() (Filter, error) { - return e.Expr.(LogSelectorExpr).Filter() -} - -func (e DownstreamExpr) Matchers() []*labels.Matcher { - return e.Expr.(LogSelectorExpr).Matchers() -} - -// ConcatSampleExpr is a sample expr which is used to signal a list of -// SampleExprs which should be joined -type ConcatSampleExpr struct { - SampleExpr - next *ConcatSampleExpr -} - -// ConcatLogSelectorExpr is a sample expr which is used to signal a list of -// LogSelectorExprs which should be joined -type ConcatLogSelectorExpr struct { - LogSelectorExpr - next *ConcatLogSelectorExpr -} - -// shardedEvaluator is an evaluator which handles shard aware AST nodes -// and embeds a default evaluator otherwise -type shardedEvaluator struct { - shards int - evaluator *defaultEvaluator -} - -// Evaluator returns a StepEvaluator for a given SampleExpr -func (ev *shardedEvaluator) Evaluator( - ctx context.Context, - expr SampleExpr, - params Params, -) (StepEvaluator, error) { - switch e := expr.(type) { - case DownstreamExpr: - // TODO(owen): downstream this and present as StepEvaluator - return nil, errors.New("unimplemented") - case ConcatSampleExpr: - var evaluators []StepEvaluator - for { - eval, err := ev.Evaluator(ctx, e.SampleExpr, params) - if err != nil { - return nil, err - } - evaluators = append(evaluators, eval) - if e.next != nil { - break - } - e = *e.next - } - return ConcatEvaluator(evaluators) - default: - return ev.evaluator.Evaluator(ctx, expr, params) - } -} - -// Iterator returns the iter.EntryIterator for a given LogSelectorExpr -func (ev *shardedEvaluator) Iterator( - ctx context.Context, - expr LogSelectorExpr, - params Params, -) (iter.EntryIterator, error) { - switch e := expr.(type) { - case DownstreamExpr: - // TODO(owen): downstream this and present as iter.EntryIterator - return nil, errors.New("unimplemented") - case ConcatLogSelectorExpr: - var iters []iter.EntryIterator - - for { - iter, err := ev.Iterator(ctx, e.LogSelectorExpr, params) - // TODO(owen): close these iters? - if err != nil { - return nil, err - } - iters = append(iters, iter) - if e.next == nil { - break - } - e = *e.next - } - return iter.NewHeapIterator(ctx, iters, params.Direction()) - default: - return nil, errors.Errorf("unexpected type (%T): %v", e, e) - } -} - -/* -map :: AST -> AST - - -*/ diff --git a/pkg/logql/sharding_evaluator.go b/pkg/logql/sharding_evaluator.go new file mode 100644 index 0000000000000..c5452863c860b --- /dev/null +++ b/pkg/logql/sharding_evaluator.go @@ -0,0 +1,71 @@ +package logql + +import ( + "context" + "fmt" + + "github.com/grafana/loki/pkg/iter" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/promql" +) + +// downstreamEvaluator is an evaluator which handles shard aware AST nodes +// and embeds a default evaluator otherwise +type downstreamEvaluator struct { + shards int + evaluator *defaultEvaluator +} + +// Evaluator returns a StepEvaluator for a given SampleExpr +func (ev *downstreamEvaluator) Evaluator( + ctx context.Context, + expr SampleExpr, + params Params, +) (StepEvaluator, error) { + switch e := expr.(type) { + case DownstreamSampleExpr: + // determine type (SampleExpr, LogSelectorExpr) and downstream to a querier + return nil, errors.New("unimplemented") + case ConcatSampleExpr: + // ensure they all impl the same (SampleExpr, LogSelectorExpr) & concat + return nil, errors.New("unimplemented") + default: + // used for aggregating downstreamed exprs, literalExprs + return ev.evaluator.Evaluator(ctx, expr, params) + } +} + +// Iterator returns the iter.EntryIterator for a given LogSelectorExpr +func (ev *downstreamEvaluator) Iterator( + _ context.Context, + expr LogSelectorExpr, + _ Params, +) (iter.EntryIterator, error) { + return nil, fmt.Errorf("downstreamEvaluator does not implement Iterator, called with expr: %+v", expr) +} + +// ConcatEvaluator joins multiple StepEvaluators. +// Contract: They must be of identical start, end, and step values. +func ConcatEvaluator(evaluators []StepEvaluator) (StepEvaluator, error) { + return newStepEvaluator( + func() (done bool, ts int64, vec promql.Vector) { + var cur promql.Vector + for { + for _, eval := range evaluators { + done, ts, cur = eval.Next() + vec = append(vec, cur...) + } + } + return done, ts, vec + + }, + func() (lastErr error) { + for _, eval := range evaluators { + if err := eval.Close(); err != nil { + lastErr = err + } + } + return lastErr + }, + ) +} diff --git a/pkg/logql/sharding_exprs.go b/pkg/logql/sharding_exprs.go new file mode 100644 index 0000000000000..b37164920137d --- /dev/null +++ b/pkg/logql/sharding_exprs.go @@ -0,0 +1,25 @@ +package logql + +// DownstreamSampleExpr is a SampleExpr which signals downstream computation +type DownstreamSampleExpr struct { + shard *int + SampleExpr +} + +// DownstreamLogSelectorExpr is a LogSelectorExpr which signals downstream computation +type DownstreamLogSelectorExpr struct { + shard *int + LogSelectorExpr +} + +// ConcatSampleExpr is an expr for concatenating multiple SampleExpr +type ConcatSampleExpr struct { + SampleExpr + next *ConcatSampleExpr +} + +// ConcatLogSelectorExpr is an expr for concatenating multiple LogSelectorExpr +type ConcatLogSelectorExpr struct { + LogSelectorExpr + next *ConcatLogSelectorExpr +} diff --git a/pkg/logql/sharding_mapper.go b/pkg/logql/sharding_mapper.go new file mode 100644 index 0000000000000..bd63cb5167ee2 --- /dev/null +++ b/pkg/logql/sharding_mapper.go @@ -0,0 +1,5 @@ +package logql + +type ShardMapper struct { + shards int +} From b71472b24a7de6360c5d57285efa3706c6aaa4d1 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 19 Mar 2020 15:06:56 -0400 Subject: [PATCH 04/21] evaluators can pass state to other evaluators --- pkg/logql/ast.go | 2 +- pkg/logql/evaluator.go | 87 +++++++++++++++++++++++++----------------- 2 files changed, 52 insertions(+), 37 deletions(-) diff --git a/pkg/logql/ast.go b/pkg/logql/ast.go index efd6c2f522223..91e3c8c5ade60 100644 --- a/pkg/logql/ast.go +++ b/pkg/logql/ast.go @@ -386,7 +386,7 @@ func mustNewBinOpExpr(op string, lhs, rhs Expr) SampleExpr { // This is because literals need match all labels, which is currently difficult to encode into StepEvaluators. // Therefore, we ensure a binop can be reduced/simplified, maintaining the invariant that it does not have two literal legs. func reduceBinOp(op string, left, right *literalExpr) *literalExpr { - merged := (&defaultEvaluator{}).mergeBinOp( + merged := mergeBinOp( op, &promql.Sample{Point: promql.Point{V: left.value}}, &promql.Sample{Point: promql.Point{V: right.value}}, diff --git a/pkg/logql/evaluator.go b/pkg/logql/evaluator.go index f5cb2fce5fe69..c25bd5f87edb7 100644 --- a/pkg/logql/evaluator.go +++ b/pkg/logql/evaluator.go @@ -69,8 +69,9 @@ func GetRangeType(q Params) QueryRangeType { // Evaluator is an interface for iterating over data at different nodes in the AST type Evaluator interface { - // Evaluator returns a StepEvaluator for a given SampleExpr - Evaluator(context.Context, SampleExpr, Params) (StepEvaluator, error) + // Evaluator returns a StepEvaluator for a given SampleExpr. It's explicitly passed another Evaluator// in order to enable arbitrary compuation of embedded expressions. This allows more modular & extensible + // Evaluator implementations which can be composed. + Evaluator(ctx context.Context, nextEvaluator Evaluator, expr SampleExpr, p Params) (StepEvaluator, error) // Iterator returns the iter.EntryIterator for a given LogSelectorExpr Iterator(context.Context, LogSelectorExpr, Params) (iter.EntryIterator, error) } @@ -99,21 +100,43 @@ func (ev *defaultEvaluator) Iterator(ctx context.Context, expr LogSelectorExpr, } -func (ev *defaultEvaluator) Evaluator(ctx context.Context, expr SampleExpr, q Params) (StepEvaluator, error) { +func (ev *defaultEvaluator) Evaluator( + ctx context.Context, + nextEv Evaluator, + expr SampleExpr, + q Params, +) (StepEvaluator, error) { switch e := expr.(type) { case *vectorAggregationExpr: - return ev.vectorAggEvaluator(ctx, e, q) + return vectorAggEvaluator(ctx, nextEv, e, q) case *rangeAggregationExpr: - return ev.rangeAggEvaluator(ctx, e, q) + entryIter, err := ev.querier.Select(ctx, SelectParams{ + &logproto.QueryRequest{ + Start: q.Start().Add(-e.left.interval), + End: q.End(), + Limit: 0, + Direction: logproto.FORWARD, + Selector: expr.Selector().String(), + }, + }) + if err != nil { + return nil, err + } + return rangeAggEvaluator(ctx, entryIter, e, q) case *binOpExpr: - return ev.binOpEvaluator(ctx, e, q) + return binOpStepEvaluator(ctx, nextEv, e, q) default: return nil, errors.Errorf("unexpected type (%T): %v", e, e) } } -func (ev *defaultEvaluator) vectorAggEvaluator(ctx context.Context, expr *vectorAggregationExpr, q Params) (StepEvaluator, error) { - nextEvaluator, err := ev.Evaluator(ctx, expr.left, q) +func vectorAggEvaluator( + ctx context.Context, + ev Evaluator, + expr *vectorAggregationExpr, + q Params, +) (StepEvaluator, error) { + nextEvaluator, err := ev.Evaluator(ctx, ev, expr.left, q) if err != nil { return nil, err } @@ -302,21 +325,12 @@ func (ev *defaultEvaluator) vectorAggEvaluator(ctx context.Context, expr *vector }, nextEvaluator.Close) } -func (ev *defaultEvaluator) rangeAggEvaluator(ctx context.Context, expr *rangeAggregationExpr, q Params) (StepEvaluator, error) { - entryIter, err := ev.querier.Select(ctx, SelectParams{ - &logproto.QueryRequest{ - Start: q.Start().Add(-expr.left.interval), - End: q.End(), - Limit: 0, - Direction: logproto.FORWARD, - Selector: expr.Selector().String(), - }, - }) - - if err != nil { - return nil, err - } - +func rangeAggEvaluator( + ctx context.Context, + entryIter iter.EntryIterator, + expr *rangeAggregationExpr, + q Params, +) (StepEvaluator, error) { vecIter := newRangeVectorIterator(entryIter, expr.left.interval.Nanoseconds(), q.Step().Nanoseconds(), q.Start().UnixNano(), q.End().UnixNano()) @@ -341,8 +355,9 @@ func (ev *defaultEvaluator) rangeAggEvaluator(ctx context.Context, expr *rangeAg // binOpExpr explicly does not handle when both legs are literals as // it makes the type system simpler and these are reduced in mustNewBinOpExpr -func (ev *defaultEvaluator) binOpEvaluator( +func binOpStepEvaluator( ctx context.Context, + ev Evaluator, expr *binOpExpr, q Params, ) (StepEvaluator, error) { @@ -352,26 +367,26 @@ func (ev *defaultEvaluator) binOpEvaluator( // match a literal expr with all labels in the other leg if lOk { - rhs, err := ev.Evaluator(ctx, expr.RHS, q) + rhs, err := ev.Evaluator(ctx, ev, expr.RHS, q) if err != nil { return nil, err } - return ev.literalEvaluator(expr.op, leftLit, rhs, false) + return literalStepEvaluator(expr.op, leftLit, rhs, false) } if rOk { - lhs, err := ev.Evaluator(ctx, expr.SampleExpr, q) + lhs, err := ev.Evaluator(ctx, ev, expr.SampleExpr, q) if err != nil { return nil, err } - return ev.literalEvaluator(expr.op, rightLit, lhs, true) + return literalStepEvaluator(expr.op, rightLit, lhs, true) } // we have two non literal legs - lhs, err := ev.Evaluator(ctx, expr.SampleExpr, q) + lhs, err := ev.Evaluator(ctx, ev, expr.SampleExpr, q) if err != nil { return nil, err } - rhs, err := ev.Evaluator(ctx, expr.RHS, q) + rhs, err := ev.Evaluator(ctx, ev, expr.RHS, q) if err != nil { return nil, err } @@ -409,7 +424,7 @@ func (ev *defaultEvaluator) binOpEvaluator( for _, pair := range pairs { // merge - if merged := ev.mergeBinOp(expr.op, pair[0], pair[1]); merged != nil { + if merged := mergeBinOp(expr.op, pair[0], pair[1]); merged != nil { results = append(results, *merged) } } @@ -425,7 +440,7 @@ func (ev *defaultEvaluator) binOpEvaluator( }) } -func (ev *defaultEvaluator) mergeBinOp(op string, left, right *promql.Sample) *promql.Sample { +func mergeBinOp(op string, left, right *promql.Sample) *promql.Sample { var merger func(left, right *promql.Sample) *promql.Sample switch op { @@ -554,9 +569,9 @@ func (ev *defaultEvaluator) mergeBinOp(op string, left, right *promql.Sample) *p } -// literalEvaluator merges a literal with a StepEvaluator. Since order matters in +// literalStepEvaluator merges a literal with a StepEvaluator. Since order matters in // non commutative operations, inverted should be true when the literalExpr is not the left argument. -func (ev *defaultEvaluator) literalEvaluator( +func literalStepEvaluator( op string, lit *literalExpr, eval StepEvaluator, @@ -578,7 +593,7 @@ func (ev *defaultEvaluator) literalEvaluator( left, right = right, left } - if merged := ev.mergeBinOp( + if merged := mergeBinOp( op, left, right, @@ -595,7 +610,7 @@ func (ev *defaultEvaluator) literalEvaluator( // ConcatEvaluator joins multiple StepEvaluators. // Contract: They must be of identical start, end, and step values. -func ConcatEvaluator(evaluators []StepEvaluator) StepEvaluator { +func ConcatEvaluator(evaluators []StepEvaluator) (StepEvaluator, error) { return newStepEvaluator( func() (done bool, ts int64, vec promql.Vector) { var cur promql.Vector From 542563090272c09109a3728cbc821534ab99e9cd Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 19 Mar 2020 17:13:15 -0400 Subject: [PATCH 05/21] compiler alignment --- pkg/logql/engine.go | 2 +- pkg/logql/evaluator_test.go | 5 ++--- pkg/logql/sharding_evaluator.go | 35 ++++----------------------------- 3 files changed, 7 insertions(+), 35 deletions(-) diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index 0753173148368..444f11775ced5 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -212,7 +212,7 @@ func (ng *engine) evalSample(ctx context.Context, expr SampleExpr, q *query) (pr return ng.evalLiteral(ctx, lit, q) } - stepEvaluator, err := ng.evaluator.Evaluator(ctx, expr, q) + stepEvaluator, err := ng.evaluator.Evaluator(ctx, ng.evaluator, expr, q) if err != nil { return nil, err } diff --git a/pkg/logql/evaluator_test.go b/pkg/logql/evaluator_test.go index 23ef3d2f71c94..a25a72e6ae85e 100644 --- a/pkg/logql/evaluator_test.go +++ b/pkg/logql/evaluator_test.go @@ -9,9 +9,8 @@ import ( ) func TestDefaultEvaluator_DivideByZero(t *testing.T) { - ev := &defaultEvaluator{} - require.Equal(t, true, math.IsNaN(ev.mergeBinOp(OpTypeDiv, + require.Equal(t, true, math.IsNaN(mergeBinOp(OpTypeDiv, &promql.Sample{ Point: promql.Point{T: 1, V: 1}, }, @@ -20,7 +19,7 @@ func TestDefaultEvaluator_DivideByZero(t *testing.T) { }, ).Point.V)) - require.Equal(t, true, math.IsNaN(ev.mergeBinOp(OpTypeMod, + require.Equal(t, true, math.IsNaN(mergeBinOp(OpTypeMod, &promql.Sample{ Point: promql.Point{T: 1, V: 1}, }, diff --git a/pkg/logql/sharding_evaluator.go b/pkg/logql/sharding_evaluator.go index c5452863c860b..e5d7dd9958758 100644 --- a/pkg/logql/sharding_evaluator.go +++ b/pkg/logql/sharding_evaluator.go @@ -6,23 +6,22 @@ import ( "github.com/grafana/loki/pkg/iter" "github.com/pkg/errors" - "github.com/prometheus/prometheus/promql" ) // downstreamEvaluator is an evaluator which handles shard aware AST nodes // and embeds a default evaluator otherwise type downstreamEvaluator struct { - shards int - evaluator *defaultEvaluator + shards int } // Evaluator returns a StepEvaluator for a given SampleExpr func (ev *downstreamEvaluator) Evaluator( ctx context.Context, + nextEv Evaluator, expr SampleExpr, params Params, ) (StepEvaluator, error) { - switch e := expr.(type) { + switch expr.(type) { case DownstreamSampleExpr: // determine type (SampleExpr, LogSelectorExpr) and downstream to a querier return nil, errors.New("unimplemented") @@ -31,7 +30,7 @@ func (ev *downstreamEvaluator) Evaluator( return nil, errors.New("unimplemented") default: // used for aggregating downstreamed exprs, literalExprs - return ev.evaluator.Evaluator(ctx, expr, params) + return nextEv.Evaluator(ctx, nextEv, expr, params) } } @@ -43,29 +42,3 @@ func (ev *downstreamEvaluator) Iterator( ) (iter.EntryIterator, error) { return nil, fmt.Errorf("downstreamEvaluator does not implement Iterator, called with expr: %+v", expr) } - -// ConcatEvaluator joins multiple StepEvaluators. -// Contract: They must be of identical start, end, and step values. -func ConcatEvaluator(evaluators []StepEvaluator) (StepEvaluator, error) { - return newStepEvaluator( - func() (done bool, ts int64, vec promql.Vector) { - var cur promql.Vector - for { - for _, eval := range evaluators { - done, ts, cur = eval.Next() - vec = append(vec, cur...) - } - } - return done, ts, vec - - }, - func() (lastErr error) { - for _, eval := range evaluators { - if err := eval.Close(); err != nil { - lastErr = err - } - } - return lastErr - }, - ) -} From 632aecfa0eac8f4b3589a20b3da18a339075af60 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 19 Mar 2020 17:31:08 -0400 Subject: [PATCH 06/21] Evaluator method renamed to StepEvaluator --- pkg/logql/engine.go | 2 +- pkg/logql/evaluator.go | 20 ++++++++++---------- pkg/logql/sharding_evaluator.go | 4 ++-- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index 444f11775ced5..39d2618abee40 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -212,7 +212,7 @@ func (ng *engine) evalSample(ctx context.Context, expr SampleExpr, q *query) (pr return ng.evalLiteral(ctx, lit, q) } - stepEvaluator, err := ng.evaluator.Evaluator(ctx, ng.evaluator, expr, q) + stepEvaluator, err := ng.evaluator.StepEvaluator(ctx, ng.evaluator, expr, q) if err != nil { return nil, err } diff --git a/pkg/logql/evaluator.go b/pkg/logql/evaluator.go index c25bd5f87edb7..57165302d06c7 100644 --- a/pkg/logql/evaluator.go +++ b/pkg/logql/evaluator.go @@ -67,11 +67,11 @@ func GetRangeType(q Params) QueryRangeType { return RangeType } -// Evaluator is an interface for iterating over data at different nodes in the AST +// StepEvaluator is an interface for iterating over data at different nodes in the AST type Evaluator interface { - // Evaluator returns a StepEvaluator for a given SampleExpr. It's explicitly passed another Evaluator// in order to enable arbitrary compuation of embedded expressions. This allows more modular & extensible - // Evaluator implementations which can be composed. - Evaluator(ctx context.Context, nextEvaluator Evaluator, expr SampleExpr, p Params) (StepEvaluator, error) + // StepEvaluator returns a StepEvaluator for a given SampleExpr. It's explicitly passed another StepEvaluator// in order to enable arbitrary compuation of embedded expressions. This allows more modular & extensible + // StepEvaluator implementations which can be composed. + StepEvaluator(ctx context.Context, nextEvaluator Evaluator, expr SampleExpr, p Params) (StepEvaluator, error) // Iterator returns the iter.EntryIterator for a given LogSelectorExpr Iterator(context.Context, LogSelectorExpr, Params) (iter.EntryIterator, error) } @@ -100,7 +100,7 @@ func (ev *defaultEvaluator) Iterator(ctx context.Context, expr LogSelectorExpr, } -func (ev *defaultEvaluator) Evaluator( +func (ev *defaultEvaluator) StepEvaluator( ctx context.Context, nextEv Evaluator, expr SampleExpr, @@ -136,7 +136,7 @@ func vectorAggEvaluator( expr *vectorAggregationExpr, q Params, ) (StepEvaluator, error) { - nextEvaluator, err := ev.Evaluator(ctx, ev, expr.left, q) + nextEvaluator, err := ev.StepEvaluator(ctx, ev, expr.left, q) if err != nil { return nil, err } @@ -367,14 +367,14 @@ func binOpStepEvaluator( // match a literal expr with all labels in the other leg if lOk { - rhs, err := ev.Evaluator(ctx, ev, expr.RHS, q) + rhs, err := ev.StepEvaluator(ctx, ev, expr.RHS, q) if err != nil { return nil, err } return literalStepEvaluator(expr.op, leftLit, rhs, false) } if rOk { - lhs, err := ev.Evaluator(ctx, ev, expr.SampleExpr, q) + lhs, err := ev.StepEvaluator(ctx, ev, expr.SampleExpr, q) if err != nil { return nil, err } @@ -382,11 +382,11 @@ func binOpStepEvaluator( } // we have two non literal legs - lhs, err := ev.Evaluator(ctx, ev, expr.SampleExpr, q) + lhs, err := ev.StepEvaluator(ctx, ev, expr.SampleExpr, q) if err != nil { return nil, err } - rhs, err := ev.Evaluator(ctx, ev, expr.RHS, q) + rhs, err := ev.StepEvaluator(ctx, ev, expr.RHS, q) if err != nil { return nil, err } diff --git a/pkg/logql/sharding_evaluator.go b/pkg/logql/sharding_evaluator.go index e5d7dd9958758..9cfe2b2558d4b 100644 --- a/pkg/logql/sharding_evaluator.go +++ b/pkg/logql/sharding_evaluator.go @@ -15,7 +15,7 @@ type downstreamEvaluator struct { } // Evaluator returns a StepEvaluator for a given SampleExpr -func (ev *downstreamEvaluator) Evaluator( +func (ev *downstreamEvaluator) StepEvaluator( ctx context.Context, nextEv Evaluator, expr SampleExpr, @@ -30,7 +30,7 @@ func (ev *downstreamEvaluator) Evaluator( return nil, errors.New("unimplemented") default: // used for aggregating downstreamed exprs, literalExprs - return nextEv.Evaluator(ctx, nextEv, expr, params) + return nextEv.StepEvaluator(ctx, nextEv, expr, params) } } From ec6ff833c844e79a52bea56d497fb919f6a4cb72 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 19 Mar 2020 18:16:56 -0400 Subject: [PATCH 07/21] chained evaluator impl --- pkg/logql/chainedevaluator.go | 53 +++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 pkg/logql/chainedevaluator.go diff --git a/pkg/logql/chainedevaluator.go b/pkg/logql/chainedevaluator.go new file mode 100644 index 0000000000000..346dd6992cb4f --- /dev/null +++ b/pkg/logql/chainedevaluator.go @@ -0,0 +1,53 @@ +package logql + +import ( + "context" + + "github.com/grafana/loki/pkg/iter" + "github.com/pkg/errors" +) + +// ChainedEvaluator is an evaluator which chains multiple other evaluators, +// deferring to the first successful one. +type ChainedEvaluator struct { + evaluators []Evaluator +} + +// StepEvaluator attempts the embedded evaluators until one succeeds or they all error. +func (c *ChainedEvaluator) StepEvaluator( + ctx context.Context, + nextEvaluator Evaluator, + expr SampleExpr, + p Params, +) (stepper StepEvaluator, err error) { + for _, eval := range c.evaluators { + if stepper, err = eval.StepEvaluator(ctx, nextEvaluator, expr, p); err == nil { + return stepper, nil + } + } + return nil, err +} + +// Iterator attempts the embedded evaluators until one succeeds or they all error. +func (c *ChainedEvaluator) Iterator( + ctx context.Context, + expr LogSelectorExpr, + p Params, +) (iterator iter.EntryIterator, err error) { + for _, eval := range c.evaluators { + if iterator, err = eval.Iterator(ctx, expr, p); err == nil { + return iterator, nil + } + } + return nil, err +} + +// NewChainedEvaluator constructs a ChainedEvaluator from one or more Evaluators +func NewChainedEvaluator(evals ...Evaluator) (*ChainedEvaluator, error) { + if len(evals) == 0 { + return nil, errors.New("must supply an Evaluator") + } + return &ChainedEvaluator{ + evaluators: evals, + }, nil +} From e15ed28557c793cf513c892903b9eb9d8f69fe10 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Fri, 20 Mar 2020 15:29:29 -0400 Subject: [PATCH 08/21] tidying up sharding code --- pkg/logql/evaluator.go | 33 +++--------- pkg/logql/sharding.go | 93 +++++++++++++++++++++++++++++++++ pkg/logql/sharding_evaluator.go | 44 ---------------- pkg/logql/sharding_exprs.go | 25 --------- pkg/logql/sharding_mapper.go | 5 -- 5 files changed, 99 insertions(+), 101 deletions(-) create mode 100644 pkg/logql/sharding.go delete mode 100644 pkg/logql/sharding_evaluator.go delete mode 100644 pkg/logql/sharding_exprs.go delete mode 100644 pkg/logql/sharding_mapper.go diff --git a/pkg/logql/evaluator.go b/pkg/logql/evaluator.go index 57165302d06c7..f0e39d5238866 100644 --- a/pkg/logql/evaluator.go +++ b/pkg/logql/evaluator.go @@ -76,6 +76,11 @@ type Evaluator interface { Iterator(context.Context, LogSelectorExpr, Params) (iter.EntryIterator, error) } +// EvaluatorUnsupportedType is a helper for signaling that an evaluator does not support an Expr type +func EvaluatorUnsupportedType(expr Expr, ev Evaluator) error { + return errors.Errorf("unexpected expr type (%T) for Evaluator type (%T) ", expr, ev) +} + type defaultEvaluator struct { maxLookBackPeriod time.Duration querier Querier @@ -126,7 +131,7 @@ func (ev *defaultEvaluator) StepEvaluator( case *binOpExpr: return binOpStepEvaluator(ctx, nextEv, e, q) default: - return nil, errors.Errorf("unexpected type (%T): %v", e, e) + return nil, EvaluatorUnsupportedType(e, ev) } } @@ -607,29 +612,3 @@ func literalStepEvaluator( eval.Close, ) } - -// ConcatEvaluator joins multiple StepEvaluators. -// Contract: They must be of identical start, end, and step values. -func ConcatEvaluator(evaluators []StepEvaluator) (StepEvaluator, error) { - return newStepEvaluator( - func() (done bool, ts int64, vec promql.Vector) { - var cur promql.Vector - for { - for _, eval := range evaluators { - done, ts, cur = eval.Next() - vec = append(vec, cur...) - } - } - return done, ts, vec - - }, - func() (lastErr error) { - for _, eval := range evaluators { - if err := eval.Close(); err != nil { - lastErr = err - } - } - return lastErr - }, - ) -} diff --git a/pkg/logql/sharding.go b/pkg/logql/sharding.go new file mode 100644 index 0000000000000..a28a611d1b754 --- /dev/null +++ b/pkg/logql/sharding.go @@ -0,0 +1,93 @@ +package logql + +import ( + "context" + "fmt" + + "github.com/cortexproject/cortex/pkg/querier/astmapper" + "github.com/grafana/loki/pkg/iter" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/promql" +) + +// DownstreamSampleExpr is a SampleExpr which signals downstream computation +type DownstreamSampleExpr struct { + shard *astmapper.ShardAnnotation + SampleExpr +} + +// DownstreamLogSelectorExpr is a LogSelectorExpr which signals downstream computation +type DownstreamLogSelectorExpr struct { + shard *astmapper.ShardAnnotation + LogSelectorExpr +} + +// ConcatSampleExpr is an expr for concatenating multiple SampleExpr +type ConcatSampleExpr struct { + SampleExpr + next *ConcatSampleExpr +} + +// ConcatLogSelectorExpr is an expr for concatenating multiple LogSelectorExpr +type ConcatLogSelectorExpr struct { + LogSelectorExpr + next *ConcatLogSelectorExpr +} + +// downstreamEvaluator is an evaluator which handles shard aware AST nodes +// and embeds a default evaluator otherwise +type downstreamEvaluator struct { + shards int +} + +// Evaluator returns a StepEvaluator for a given SampleExpr +func (ev *downstreamEvaluator) StepEvaluator( + ctx context.Context, + nextEv Evaluator, + expr SampleExpr, + params Params, +) (StepEvaluator, error) { + switch expr.(type) { + case DownstreamSampleExpr: + // determine type (SampleExpr, LogSelectorExpr) and downstream to a querier + return nil, errors.New("unimplemented") + case ConcatSampleExpr: + // ensure they all impl the same (SampleExpr, LogSelectorExpr) & concat + return nil, errors.New("unimplemented") + default: + return nil, EvaluatorUnsupportedType(expr, ev) + } +} + +// Iterator returns the iter.EntryIterator for a given LogSelectorExpr +func (ev *downstreamEvaluator) Iterator( + _ context.Context, + expr LogSelectorExpr, + _ Params, +) (iter.EntryIterator, error) { + return nil, fmt.Errorf("downstreamEvaluator does not implement Iterator, called with expr: %+v", expr) +} + +// ConcatEvaluator joins multiple StepEvaluators. +// Contract: They must be of identical start, end, and step values. +func ConcatEvaluator(evaluators []StepEvaluator) (StepEvaluator, error) { + return newStepEvaluator( + func() (done bool, ts int64, vec promql.Vector) { + var cur promql.Vector + for _, eval := range evaluators { + done, ts, cur = eval.Next() + vec = append(vec, cur...) + } + return done, ts, vec + + }, + func() (lastErr error) { + for _, eval := range evaluators { + if err := eval.Close(); err != nil { + lastErr = err + } + } + return lastErr + }, + ) +} diff --git a/pkg/logql/sharding_evaluator.go b/pkg/logql/sharding_evaluator.go deleted file mode 100644 index 9cfe2b2558d4b..0000000000000 --- a/pkg/logql/sharding_evaluator.go +++ /dev/null @@ -1,44 +0,0 @@ -package logql - -import ( - "context" - "fmt" - - "github.com/grafana/loki/pkg/iter" - "github.com/pkg/errors" -) - -// downstreamEvaluator is an evaluator which handles shard aware AST nodes -// and embeds a default evaluator otherwise -type downstreamEvaluator struct { - shards int -} - -// Evaluator returns a StepEvaluator for a given SampleExpr -func (ev *downstreamEvaluator) StepEvaluator( - ctx context.Context, - nextEv Evaluator, - expr SampleExpr, - params Params, -) (StepEvaluator, error) { - switch expr.(type) { - case DownstreamSampleExpr: - // determine type (SampleExpr, LogSelectorExpr) and downstream to a querier - return nil, errors.New("unimplemented") - case ConcatSampleExpr: - // ensure they all impl the same (SampleExpr, LogSelectorExpr) & concat - return nil, errors.New("unimplemented") - default: - // used for aggregating downstreamed exprs, literalExprs - return nextEv.StepEvaluator(ctx, nextEv, expr, params) - } -} - -// Iterator returns the iter.EntryIterator for a given LogSelectorExpr -func (ev *downstreamEvaluator) Iterator( - _ context.Context, - expr LogSelectorExpr, - _ Params, -) (iter.EntryIterator, error) { - return nil, fmt.Errorf("downstreamEvaluator does not implement Iterator, called with expr: %+v", expr) -} diff --git a/pkg/logql/sharding_exprs.go b/pkg/logql/sharding_exprs.go deleted file mode 100644 index b37164920137d..0000000000000 --- a/pkg/logql/sharding_exprs.go +++ /dev/null @@ -1,25 +0,0 @@ -package logql - -// DownstreamSampleExpr is a SampleExpr which signals downstream computation -type DownstreamSampleExpr struct { - shard *int - SampleExpr -} - -// DownstreamLogSelectorExpr is a LogSelectorExpr which signals downstream computation -type DownstreamLogSelectorExpr struct { - shard *int - LogSelectorExpr -} - -// ConcatSampleExpr is an expr for concatenating multiple SampleExpr -type ConcatSampleExpr struct { - SampleExpr - next *ConcatSampleExpr -} - -// ConcatLogSelectorExpr is an expr for concatenating multiple LogSelectorExpr -type ConcatLogSelectorExpr struct { - LogSelectorExpr - next *ConcatLogSelectorExpr -} diff --git a/pkg/logql/sharding_mapper.go b/pkg/logql/sharding_mapper.go deleted file mode 100644 index bd63cb5167ee2..0000000000000 --- a/pkg/logql/sharding_mapper.go +++ /dev/null @@ -1,5 +0,0 @@ -package logql - -type ShardMapper struct { - shards int -} From 2565ca6c661a969be012caffe8ca51eb1849e2fe Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Sun, 22 Mar 2020 10:54:22 -0400 Subject: [PATCH 09/21] handling for ConcatSampleExpr --- pkg/logql/sharding.go | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/pkg/logql/sharding.go b/pkg/logql/sharding.go index a28a611d1b754..e5815d4928581 100644 --- a/pkg/logql/sharding.go +++ b/pkg/logql/sharding.go @@ -35,10 +35,7 @@ type ConcatLogSelectorExpr struct { } // downstreamEvaluator is an evaluator which handles shard aware AST nodes -// and embeds a default evaluator otherwise -type downstreamEvaluator struct { - shards int -} +type downstreamEvaluator struct{} // Evaluator returns a StepEvaluator for a given SampleExpr func (ev *downstreamEvaluator) StepEvaluator( @@ -47,13 +44,27 @@ func (ev *downstreamEvaluator) StepEvaluator( expr SampleExpr, params Params, ) (StepEvaluator, error) { - switch expr.(type) { + switch e := expr.(type) { case DownstreamSampleExpr: // determine type (SampleExpr, LogSelectorExpr) and downstream to a querier return nil, errors.New("unimplemented") + case ConcatSampleExpr: // ensure they all impl the same (SampleExpr, LogSelectorExpr) & concat - return nil, errors.New("unimplemented") + var xs []StepEvaluator + cur := &e + + for cur != nil { + eval, err := ev.StepEvaluator(ctx, nextEv, cur.SampleExpr, params) + if err != nil { + return nil, err + } + xs = append(xs, eval) + cur = cur.next + } + + return ConcatEvaluator(xs) + default: return nil, EvaluatorUnsupportedType(expr, ev) } From 52e1cfaac6bea214b5e2fd6ef90c8e4cb565d6b5 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Sun, 22 Mar 2020 14:22:00 -0400 Subject: [PATCH 10/21] downstream iterator --- pkg/logql/sharding.go | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/pkg/logql/sharding.go b/pkg/logql/sharding.go index e5815d4928581..88ebb3ad4c11b 100644 --- a/pkg/logql/sharding.go +++ b/pkg/logql/sharding.go @@ -2,7 +2,6 @@ package logql import ( "context" - "fmt" "github.com/cortexproject/cortex/pkg/querier/astmapper" "github.com/grafana/loki/pkg/iter" @@ -46,7 +45,7 @@ func (ev *downstreamEvaluator) StepEvaluator( ) (StepEvaluator, error) { switch e := expr.(type) { case DownstreamSampleExpr: - // determine type (SampleExpr, LogSelectorExpr) and downstream to a querier + // downstream to a querier return nil, errors.New("unimplemented") case ConcatSampleExpr: @@ -72,11 +71,25 @@ func (ev *downstreamEvaluator) StepEvaluator( // Iterator returns the iter.EntryIterator for a given LogSelectorExpr func (ev *downstreamEvaluator) Iterator( - _ context.Context, + ctx context.Context, expr LogSelectorExpr, - _ Params, + params Params, ) (iter.EntryIterator, error) { - return nil, fmt.Errorf("downstreamEvaluator does not implement Iterator, called with expr: %+v", expr) + switch e := expr.(type) { + case DownstreamLogSelectorExpr: + case ConcatLogSelectorExpr: + var iters []iter.EntryIterator + cur := &e + for cur != nil { + iterator, err := ev.Iterator(ctx, e, params) + if err != nil { + return nil, err + } + iters = append(iters, iterator) + } + return iter.NewHeapIterator(ctx, iters, params.Direction()), nil + } + return nil, errors.New("unimplemented") } // ConcatEvaluator joins multiple StepEvaluators. From ff55f63dd3f603c663f37cf7fe1f766c57487bb0 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Sun, 22 Mar 2020 14:57:32 -0400 Subject: [PATCH 11/21] structure for downstreaming asts --- pkg/logql/downstreamer.go | 7 +++++++ pkg/logql/sharding.go | 12 +++++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) create mode 100644 pkg/logql/downstreamer.go diff --git a/pkg/logql/downstreamer.go b/pkg/logql/downstreamer.go new file mode 100644 index 0000000000000..96fdef69dc919 --- /dev/null +++ b/pkg/logql/downstreamer.go @@ -0,0 +1,7 @@ +package logql + +// Downstreamer is an interface for deferring responsibility for query execution. +// It is decoupled from but consumed by a downStreamEvaluator to dispatch ASTs. +type Downstreamer interface { + // Downstream(*LokiRequest) (*LokiResponse, error) +} diff --git a/pkg/logql/sharding.go b/pkg/logql/sharding.go index 88ebb3ad4c11b..1af369cb7653d 100644 --- a/pkg/logql/sharding.go +++ b/pkg/logql/sharding.go @@ -34,7 +34,7 @@ type ConcatLogSelectorExpr struct { } // downstreamEvaluator is an evaluator which handles shard aware AST nodes -type downstreamEvaluator struct{} +type downstreamEvaluator struct{ Downstreamer } // Evaluator returns a StepEvaluator for a given SampleExpr func (ev *downstreamEvaluator) StepEvaluator( @@ -56,6 +56,10 @@ func (ev *downstreamEvaluator) StepEvaluator( for cur != nil { eval, err := ev.StepEvaluator(ctx, nextEv, cur.SampleExpr, params) if err != nil { + // Close previously opened StepEvaluators + for _, x := range xs { + x.Close() + } return nil, err } xs = append(xs, eval) @@ -77,12 +81,18 @@ func (ev *downstreamEvaluator) Iterator( ) (iter.EntryIterator, error) { switch e := expr.(type) { case DownstreamLogSelectorExpr: + // downstream to a querier + return nil, errors.New("unimplemented") case ConcatLogSelectorExpr: var iters []iter.EntryIterator cur := &e for cur != nil { iterator, err := ev.Iterator(ctx, e, params) if err != nil { + // Close previously opened StepEvaluators + for _, x := range iters { + x.Close() + } return nil, err } iters = append(iters, iterator) From df5a69a8aaaa09434d2bbbf8ed69cf8cf90b3db0 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Sun, 22 Mar 2020 16:49:38 -0400 Subject: [PATCH 12/21] outlines sharding optimizations --- pkg/logql/astmapper.go | 33 +++++++++++++++++++++++++++++ pkg/logql/properties.go | 28 ------------------------ pkg/logql/sharding_optimizations.go | 19 +++++++++++++++++ 3 files changed, 52 insertions(+), 28 deletions(-) delete mode 100644 pkg/logql/properties.go create mode 100644 pkg/logql/sharding_optimizations.go diff --git a/pkg/logql/astmapper.go b/pkg/logql/astmapper.go index 4e1a91324fd89..3f4693d0970c0 100644 --- a/pkg/logql/astmapper.go +++ b/pkg/logql/astmapper.go @@ -9,3 +9,36 @@ type ASTMapper interface { func CloneExpr(expr Expr) (Expr, error) { return ParseExpr(expr.String()) } + +type ShardMapper struct { + shards int +} + +func (m ShardMapper) Map(expr Expr) (Expr, error) { return nil, nil } + +// technically, std{dev,var} are also parallelizable if there is no cross-shard merging +// in descendent nodes in the AST. This optimization is currently avoided for simplicity. +var parallelOperations = map[string]bool{ + OpTypeSum: true, + OpTypeAvg: true, + OpTypeMax: true, + OpTypeMin: true, + OpTypeCount: true, + OpTypeBottomK: true, + OpTypeTopK: true, + OpTypeCountOverTime: true, + OpTypeRate: true, +} + +func CanParallel(e Expr) bool { + switch expr := e.(type) { + case *matchersExpr, *filterExpr: + return true + case *rangeAggregationExpr: + return parallelOperations[expr.operation] + case *vectorAggregationExpr: + return parallelOperations[expr.operation] && CanParallel(expr.left) + default: + return false + } +} diff --git a/pkg/logql/properties.go b/pkg/logql/properties.go deleted file mode 100644 index 96cd220409242..0000000000000 --- a/pkg/logql/properties.go +++ /dev/null @@ -1,28 +0,0 @@ -package logql - -// technically, std{dev,var} are also parallelizable if there is no cross-shard merging -// in descendent nodes in the AST. This optimization is currently avoided for simplicity. -var parallelOperations = map[string]bool{ - OpTypeSum: true, - OpTypeAvg: true, - OpTypeMax: true, - OpTypeMin: true, - OpTypeCount: true, - OpTypeBottomK: true, - OpTypeTopK: true, - OpTypeCountOverTime: true, - OpTypeRate: true, -} - -func CanParallel(e Expr) bool { - switch expr := e.(type) { - case *matchersExpr, *filterExpr: - return true - case *rangeAggregationExpr: - return parallelOperations[expr.operation] - case *vectorAggregationExpr: - return parallelOperations[expr.operation] && CanParallel(expr.left) - default: - return false - } -} diff --git a/pkg/logql/sharding_optimizations.go b/pkg/logql/sharding_optimizations.go new file mode 100644 index 0000000000000..2d045ac32907d --- /dev/null +++ b/pkg/logql/sharding_optimizations.go @@ -0,0 +1,19 @@ +package logql + +// avg(x) -> sum(x)/count(x) + +// sum(x) -> sum(sum(x, shard=1) ++ sum(x, shard=2)...) + +// max(x) -> max(max(x, shard=1) ++ max(x, shard=2)...) + +// min(x) -> min(min(x, shard=1) ++ min(x, shard=2)...) + +// count(x) -> sum(count(x, shard=1) ++ count(x, shard=2)...) + +// topk(x) -> topk(topk(x, shard=1) ++ topk(x, shard=2)...) + +// botk(x) -> botk(botk(x, shard=1) ++ botk(x, shard=2)...) + +// rate(x) -> rate(x, shard=1) ++ rate(x, shard=2)... + +// count_over_time(x) -> count_over_time(x, shard=1) ++ count_over_time(x, shard=2)... From ec050ac4f46be2ce58c3e0379d14684bd7555b56 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Sun, 22 Mar 2020 22:23:47 -0400 Subject: [PATCH 13/21] work on sharding mapper --- pkg/logql/astmapper.go | 106 ++++++++++++++++++++++++++++++++--------- 1 file changed, 84 insertions(+), 22 deletions(-) diff --git a/pkg/logql/astmapper.go b/pkg/logql/astmapper.go index 3f4693d0970c0..c3a4834327fb3 100644 --- a/pkg/logql/astmapper.go +++ b/pkg/logql/astmapper.go @@ -1,5 +1,12 @@ package logql +import ( + "fmt" + + "github.com/cortexproject/cortex/pkg/querier/astmapper" + "github.com/pkg/errors" +) + // ASTMapper is the exported interface for mapping between multiple AST representations type ASTMapper interface { Map(Expr) (Expr, error) @@ -14,31 +21,86 @@ type ShardMapper struct { shards int } -func (m ShardMapper) Map(expr Expr) (Expr, error) { return nil, nil } +func (m ShardMapper) Map(expr Expr) (Expr, error) { + switch e := expr.(type) { + case *literalExpr: + return e, nil + case *matchersExpr, *filterExpr: + return m.mapLogSelectorExpr(e.(LogSelectorExpr)), nil + case *vectorAggregationExpr: + return m.mapVectorAggregationExpr(e) + case *rangeAggregationExpr: + return m.mapRangeAggregationExpr(e) + case *binOpExpr: + lhsMapped, err := m.Map(e.SampleExpr) + if err != nil { + return nil, err + } + rhsMapped, err := m.Map(e.SampleExpr) + if err != nil { + return nil, err + } + lhsSampleExpr, ok := lhsMapped.(SampleExpr) + if !ok { + return nil, badASTMapping("SampleExpr", lhsMapped) + } + rhsSampleExpr, ok := rhsMapped.(SampleExpr) + if !ok { + return nil, badASTMapping("SampleExpr", rhsMapped) + } + e.SampleExpr = lhsSampleExpr + e.RHS = rhsSampleExpr + return e, nil + default: + return nil, MapperUnsupportedType(expr, m) + } +} + +func (m ShardMapper) mapLogSelectorExpr(expr LogSelectorExpr) LogSelectorExpr { + var head *ConcatLogSelectorExpr + for i := m.shards - 1; i >= 0; i-- { + head = &ConcatLogSelectorExpr{ + LogSelectorExpr: DownstreamLogSelectorExpr{ + LogSelectorExpr: expr, + shard: &astmapper.ShardAnnotation{Shard: i, Of: m.shards}, + }, + next: head, + } + } + + return head +} // technically, std{dev,var} are also parallelizable if there is no cross-shard merging // in descendent nodes in the AST. This optimization is currently avoided for simplicity. -var parallelOperations = map[string]bool{ - OpTypeSum: true, - OpTypeAvg: true, - OpTypeMax: true, - OpTypeMin: true, - OpTypeCount: true, - OpTypeBottomK: true, - OpTypeTopK: true, - OpTypeCountOverTime: true, - OpTypeRate: true, -} - -func CanParallel(e Expr) bool { - switch expr := e.(type) { - case *matchersExpr, *filterExpr: - return true - case *rangeAggregationExpr: - return parallelOperations[expr.operation] - case *vectorAggregationExpr: - return parallelOperations[expr.operation] && CanParallel(expr.left) +func (m ShardMapper) mapVectorAggregationExpr(expr *vectorAggregationExpr) (Expr, error) { + switch expr.operation { + case OpTypeSum: + case OpTypeAvg: + case OpTypeMax: + case OpTypeMin: + case OpTypeCount: + case OpTypeBottomK: + case OpTypeTopK: default: - return false + return expr, nil } } + +func (m ShardMapper) mapRangeAggregationExpr(expr *rangeAggregationExpr) (Expr, error) { + switch expr.operation { + case OpTypeCountOverTime: + case OpTypeRate: + default: + return expr, nil + } +} + +func badASTMapping(expected string, got Expr) error { + return fmt.Errorf("Bad AST mapping: expected one type (%s), but got (%T)", expected, got) +} + +// MapperUnsuportedType is a helper for signaling that an evaluator does not support an Expr type +func MapperUnsupportedType(expr Expr, m ASTMapper) error { + return errors.Errorf("unexpected expr type (%T) for ASTMapper type (%T) ", expr, m) +} From ff1ca2b21cec54ed21180dea50de4ed25ce83376 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Sun, 22 Mar 2020 23:16:46 -0400 Subject: [PATCH 14/21] ast sharding optimizations --- pkg/logql/astmapper.go | 85 +++++++++++++++++++++++++---- pkg/logql/sharding_optimizations.go | 19 ------- 2 files changed, 73 insertions(+), 31 deletions(-) delete mode 100644 pkg/logql/sharding_optimizations.go diff --git a/pkg/logql/astmapper.go b/pkg/logql/astmapper.go index c3a4834327fb3..9db674378a3fc 100644 --- a/pkg/logql/astmapper.go +++ b/pkg/logql/astmapper.go @@ -30,7 +30,7 @@ func (m ShardMapper) Map(expr Expr) (Expr, error) { case *vectorAggregationExpr: return m.mapVectorAggregationExpr(e) case *rangeAggregationExpr: - return m.mapRangeAggregationExpr(e) + return m.mapRangeAggregationExpr(e), nil case *binOpExpr: lhsMapped, err := m.Map(e.SampleExpr) if err != nil { @@ -62,7 +62,27 @@ func (m ShardMapper) mapLogSelectorExpr(expr LogSelectorExpr) LogSelectorExpr { head = &ConcatLogSelectorExpr{ LogSelectorExpr: DownstreamLogSelectorExpr{ LogSelectorExpr: expr, - shard: &astmapper.ShardAnnotation{Shard: i, Of: m.shards}, + shard: &astmapper.ShardAnnotation{ + Shard: i, + Of: m.shards, + }, + }, + next: head, + } + } + + return head +} + +func (m ShardMapper) mapSampleExpr(expr SampleExpr) SampleExpr { + var head *ConcatSampleExpr + for i := m.shards - 1; i >= 0; i-- { + head = &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: i, + Of: m.shards, + }, }, next: head, } @@ -73,26 +93,67 @@ func (m ShardMapper) mapLogSelectorExpr(expr LogSelectorExpr) LogSelectorExpr { // technically, std{dev,var} are also parallelizable if there is no cross-shard merging // in descendent nodes in the AST. This optimization is currently avoided for simplicity. -func (m ShardMapper) mapVectorAggregationExpr(expr *vectorAggregationExpr) (Expr, error) { +func (m ShardMapper) mapVectorAggregationExpr(expr *vectorAggregationExpr) (SampleExpr, error) { switch expr.operation { - case OpTypeSum: + // sum(x) -> sum(sum(x, shard=1) ++ sum(x, shard=2)...) + // max(x) -> max(max(x, shard=1) ++ max(x, shard=2)...) + // min(x) -> min(min(x, shard=1) ++ min(x, shard=2)...) + // topk(x) -> topk(topk(x, shard=1) ++ topk(x, shard=2)...) + // botk(x) -> botk(botk(x, shard=1) ++ botk(x, shard=2)...) + case OpTypeSum, OpTypeMax, OpTypeMin, OpTypeTopK, OpTypeBottomK: + return &vectorAggregationExpr{ + left: m.mapSampleExpr(expr), + grouping: expr.grouping, + params: expr.params, + operation: expr.operation, + }, nil + case OpTypeAvg: - case OpTypeMax: - case OpTypeMin: + // avg(x) -> sum(x)/count(x) + lhs, err := m.mapVectorAggregationExpr(&vectorAggregationExpr{ + left: expr.left, + grouping: expr.grouping, + operation: OpTypeSum, + }) + if err != nil { + return nil, err + } + rhs, err := m.mapVectorAggregationExpr(&vectorAggregationExpr{ + left: expr.left, + grouping: expr.grouping, + operation: OpTypeCount, + }) + if err != nil { + return nil, err + } + + return &binOpExpr{ + SampleExpr: lhs, + RHS: rhs, + op: OpTypeDiv, + }, nil + case OpTypeCount: - case OpTypeBottomK: - case OpTypeTopK: + // count(x) -> sum(count(x, shard=1) ++ count(x, shard=2)...) + sharded := m.mapSampleExpr(expr) + return &vectorAggregationExpr{ + left: sharded, + grouping: expr.grouping, + operation: OpTypeSum, + }, nil default: return expr, nil } } -func (m ShardMapper) mapRangeAggregationExpr(expr *rangeAggregationExpr) (Expr, error) { +func (m ShardMapper) mapRangeAggregationExpr(expr *rangeAggregationExpr) SampleExpr { switch expr.operation { - case OpTypeCountOverTime: - case OpTypeRate: + case OpTypeCountOverTime, OpTypeRate: + // count_over_time(x) -> count_over_time(x, shard=1) ++ count_over_time(x, shard=2)... + // rate(x) -> rate(x, shard=1) ++ rate(x, shard=2)... + return m.mapSampleExpr(expr) default: - return expr, nil + return expr } } diff --git a/pkg/logql/sharding_optimizations.go b/pkg/logql/sharding_optimizations.go deleted file mode 100644 index 2d045ac32907d..0000000000000 --- a/pkg/logql/sharding_optimizations.go +++ /dev/null @@ -1,19 +0,0 @@ -package logql - -// avg(x) -> sum(x)/count(x) - -// sum(x) -> sum(sum(x, shard=1) ++ sum(x, shard=2)...) - -// max(x) -> max(max(x, shard=1) ++ max(x, shard=2)...) - -// min(x) -> min(min(x, shard=1) ++ min(x, shard=2)...) - -// count(x) -> sum(count(x, shard=1) ++ count(x, shard=2)...) - -// topk(x) -> topk(topk(x, shard=1) ++ topk(x, shard=2)...) - -// botk(x) -> botk(botk(x, shard=1) ++ botk(x, shard=2)...) - -// rate(x) -> rate(x, shard=1) ++ rate(x, shard=2)... - -// count_over_time(x) -> count_over_time(x, shard=1) ++ count_over_time(x, shard=2)... From 6a3e8603d11e3e3474d4789167d220f27a230e28 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 23 Mar 2020 09:34:48 -0400 Subject: [PATCH 15/21] test for different logrange positions --- pkg/logql/parser_test.go | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/pkg/logql/parser_test.go b/pkg/logql/parser_test.go index 13a38536fb99c..2687249513c2d 100644 --- a/pkg/logql/parser_test.go +++ b/pkg/logql/parser_test.go @@ -20,6 +20,44 @@ func TestParse(t *testing.T) { exp Expr err error }{ + { + // test [12h] before filter expr + in: `count_over_time({foo="bar"}[12h] |= "error")`, + exp: &rangeAggregationExpr{ + operation: "count_over_time", + left: &logRange{ + left: &filterExpr{ + ty: labels.MatchEqual, + match: "error", + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + }, + interval: 12 * time.Hour, + }, + }, + }, + { + // test [12h] after filter expr + in: `count_over_time({foo="bar"} |= "error" [12h])`, + exp: &rangeAggregationExpr{ + operation: "count_over_time", + left: &logRange{ + left: &filterExpr{ + ty: labels.MatchEqual, + match: "error", + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + }, + interval: 12 * time.Hour, + }, + }, + }, { in: `{foo="bar"}`, exp: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}}, From 2657ab0264360f3aa53eaafa48f91a572bb71f90 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 23 Mar 2020 10:33:05 -0400 Subject: [PATCH 16/21] shard mapper tests --- pkg/logql/astmapper.go | 141 ----------------------- pkg/logql/shardmapper.go | 155 +++++++++++++++++++++++++ pkg/logql/shardmapper_test.go | 209 ++++++++++++++++++++++++++++++++++ 3 files changed, 364 insertions(+), 141 deletions(-) create mode 100644 pkg/logql/shardmapper.go create mode 100644 pkg/logql/shardmapper_test.go diff --git a/pkg/logql/astmapper.go b/pkg/logql/astmapper.go index 9db674378a3fc..6067fbeb93e94 100644 --- a/pkg/logql/astmapper.go +++ b/pkg/logql/astmapper.go @@ -3,7 +3,6 @@ package logql import ( "fmt" - "github.com/cortexproject/cortex/pkg/querier/astmapper" "github.com/pkg/errors" ) @@ -17,146 +16,6 @@ func CloneExpr(expr Expr) (Expr, error) { return ParseExpr(expr.String()) } -type ShardMapper struct { - shards int -} - -func (m ShardMapper) Map(expr Expr) (Expr, error) { - switch e := expr.(type) { - case *literalExpr: - return e, nil - case *matchersExpr, *filterExpr: - return m.mapLogSelectorExpr(e.(LogSelectorExpr)), nil - case *vectorAggregationExpr: - return m.mapVectorAggregationExpr(e) - case *rangeAggregationExpr: - return m.mapRangeAggregationExpr(e), nil - case *binOpExpr: - lhsMapped, err := m.Map(e.SampleExpr) - if err != nil { - return nil, err - } - rhsMapped, err := m.Map(e.SampleExpr) - if err != nil { - return nil, err - } - lhsSampleExpr, ok := lhsMapped.(SampleExpr) - if !ok { - return nil, badASTMapping("SampleExpr", lhsMapped) - } - rhsSampleExpr, ok := rhsMapped.(SampleExpr) - if !ok { - return nil, badASTMapping("SampleExpr", rhsMapped) - } - e.SampleExpr = lhsSampleExpr - e.RHS = rhsSampleExpr - return e, nil - default: - return nil, MapperUnsupportedType(expr, m) - } -} - -func (m ShardMapper) mapLogSelectorExpr(expr LogSelectorExpr) LogSelectorExpr { - var head *ConcatLogSelectorExpr - for i := m.shards - 1; i >= 0; i-- { - head = &ConcatLogSelectorExpr{ - LogSelectorExpr: DownstreamLogSelectorExpr{ - LogSelectorExpr: expr, - shard: &astmapper.ShardAnnotation{ - Shard: i, - Of: m.shards, - }, - }, - next: head, - } - } - - return head -} - -func (m ShardMapper) mapSampleExpr(expr SampleExpr) SampleExpr { - var head *ConcatSampleExpr - for i := m.shards - 1; i >= 0; i-- { - head = &ConcatSampleExpr{ - SampleExpr: DownstreamSampleExpr{ - shard: &astmapper.ShardAnnotation{ - Shard: i, - Of: m.shards, - }, - }, - next: head, - } - } - - return head -} - -// technically, std{dev,var} are also parallelizable if there is no cross-shard merging -// in descendent nodes in the AST. This optimization is currently avoided for simplicity. -func (m ShardMapper) mapVectorAggregationExpr(expr *vectorAggregationExpr) (SampleExpr, error) { - switch expr.operation { - // sum(x) -> sum(sum(x, shard=1) ++ sum(x, shard=2)...) - // max(x) -> max(max(x, shard=1) ++ max(x, shard=2)...) - // min(x) -> min(min(x, shard=1) ++ min(x, shard=2)...) - // topk(x) -> topk(topk(x, shard=1) ++ topk(x, shard=2)...) - // botk(x) -> botk(botk(x, shard=1) ++ botk(x, shard=2)...) - case OpTypeSum, OpTypeMax, OpTypeMin, OpTypeTopK, OpTypeBottomK: - return &vectorAggregationExpr{ - left: m.mapSampleExpr(expr), - grouping: expr.grouping, - params: expr.params, - operation: expr.operation, - }, nil - - case OpTypeAvg: - // avg(x) -> sum(x)/count(x) - lhs, err := m.mapVectorAggregationExpr(&vectorAggregationExpr{ - left: expr.left, - grouping: expr.grouping, - operation: OpTypeSum, - }) - if err != nil { - return nil, err - } - rhs, err := m.mapVectorAggregationExpr(&vectorAggregationExpr{ - left: expr.left, - grouping: expr.grouping, - operation: OpTypeCount, - }) - if err != nil { - return nil, err - } - - return &binOpExpr{ - SampleExpr: lhs, - RHS: rhs, - op: OpTypeDiv, - }, nil - - case OpTypeCount: - // count(x) -> sum(count(x, shard=1) ++ count(x, shard=2)...) - sharded := m.mapSampleExpr(expr) - return &vectorAggregationExpr{ - left: sharded, - grouping: expr.grouping, - operation: OpTypeSum, - }, nil - default: - return expr, nil - } -} - -func (m ShardMapper) mapRangeAggregationExpr(expr *rangeAggregationExpr) SampleExpr { - switch expr.operation { - case OpTypeCountOverTime, OpTypeRate: - // count_over_time(x) -> count_over_time(x, shard=1) ++ count_over_time(x, shard=2)... - // rate(x) -> rate(x, shard=1) ++ rate(x, shard=2)... - return m.mapSampleExpr(expr) - default: - return expr - } -} - func badASTMapping(expected string, got Expr) error { return fmt.Errorf("Bad AST mapping: expected one type (%s), but got (%T)", expected, got) } diff --git a/pkg/logql/shardmapper.go b/pkg/logql/shardmapper.go new file mode 100644 index 0000000000000..dbfbd483dcf95 --- /dev/null +++ b/pkg/logql/shardmapper.go @@ -0,0 +1,155 @@ +package logql + +import ( + "fmt" + + "github.com/cortexproject/cortex/pkg/querier/astmapper" +) + +func NewShardMapper(shards int) (ShardMapper, error) { + if shards < 2 { + return ShardMapper{}, fmt.Errorf("Cannot create ShardMapper with <2 shards. Received %d", shards) + } + return ShardMapper{shards}, nil +} + +type ShardMapper struct { + shards int +} + +func (m ShardMapper) Map(expr Expr) (Expr, error) { + switch e := expr.(type) { + case *literalExpr: + return e, nil + case *matchersExpr, *filterExpr: + return m.mapLogSelectorExpr(e.(LogSelectorExpr)), nil + case *vectorAggregationExpr: + return m.mapVectorAggregationExpr(e) + case *rangeAggregationExpr: + return m.mapRangeAggregationExpr(e), nil + case *binOpExpr: + lhsMapped, err := m.Map(e.SampleExpr) + if err != nil { + return nil, err + } + rhsMapped, err := m.Map(e.SampleExpr) + if err != nil { + return nil, err + } + lhsSampleExpr, ok := lhsMapped.(SampleExpr) + if !ok { + return nil, badASTMapping("SampleExpr", lhsMapped) + } + rhsSampleExpr, ok := rhsMapped.(SampleExpr) + if !ok { + return nil, badASTMapping("SampleExpr", rhsMapped) + } + e.SampleExpr = lhsSampleExpr + e.RHS = rhsSampleExpr + return e, nil + default: + return nil, MapperUnsupportedType(expr, m) + } +} + +func (m ShardMapper) mapLogSelectorExpr(expr LogSelectorExpr) LogSelectorExpr { + var head *ConcatLogSelectorExpr + for i := m.shards - 1; i >= 0; i-- { + head = &ConcatLogSelectorExpr{ + LogSelectorExpr: DownstreamLogSelectorExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: i, + Of: m.shards, + }, + LogSelectorExpr: expr, + }, + next: head, + } + } + + return head +} + +func (m ShardMapper) mapSampleExpr(expr SampleExpr) SampleExpr { + var head *ConcatSampleExpr + for i := m.shards - 1; i >= 0; i-- { + head = &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: i, + Of: m.shards, + }, + SampleExpr: expr, + }, + next: head, + } + } + + return head +} + +// technically, std{dev,var} are also parallelizable if there is no cross-shard merging +// in descendent nodes in the AST. This optimization is currently avoided for simplicity. +func (m ShardMapper) mapVectorAggregationExpr(expr *vectorAggregationExpr) (SampleExpr, error) { + switch expr.operation { + // sum(x) -> sum(sum(x, shard=1) ++ sum(x, shard=2)...) + // max(x) -> max(max(x, shard=1) ++ max(x, shard=2)...) + // min(x) -> min(min(x, shard=1) ++ min(x, shard=2)...) + // topk(x) -> topk(topk(x, shard=1) ++ topk(x, shard=2)...) + // botk(x) -> botk(botk(x, shard=1) ++ botk(x, shard=2)...) + case OpTypeSum, OpTypeMax, OpTypeMin, OpTypeTopK, OpTypeBottomK: + return &vectorAggregationExpr{ + left: m.mapSampleExpr(expr), + grouping: expr.grouping, + params: expr.params, + operation: expr.operation, + }, nil + + case OpTypeAvg: + // avg(x) -> sum(x)/count(x) + lhs, err := m.mapVectorAggregationExpr(&vectorAggregationExpr{ + left: expr.left, + grouping: expr.grouping, + operation: OpTypeSum, + }) + if err != nil { + return nil, err + } + rhs, err := m.mapVectorAggregationExpr(&vectorAggregationExpr{ + left: expr.left, + grouping: expr.grouping, + operation: OpTypeCount, + }) + if err != nil { + return nil, err + } + + return &binOpExpr{ + SampleExpr: lhs, + RHS: rhs, + op: OpTypeDiv, + }, nil + + case OpTypeCount: + // count(x) -> sum(count(x, shard=1) ++ count(x, shard=2)...) + sharded := m.mapSampleExpr(expr) + return &vectorAggregationExpr{ + left: sharded, + grouping: expr.grouping, + operation: OpTypeSum, + }, nil + default: + return expr, nil + } +} + +func (m ShardMapper) mapRangeAggregationExpr(expr *rangeAggregationExpr) SampleExpr { + switch expr.operation { + case OpTypeCountOverTime, OpTypeRate: + // count_over_time(x) -> count_over_time(x, shard=1) ++ count_over_time(x, shard=2)... + // rate(x) -> rate(x, shard=1) ++ rate(x, shard=2)... + return m.mapSampleExpr(expr) + default: + return expr + } +} diff --git a/pkg/logql/shardmapper_test.go b/pkg/logql/shardmapper_test.go new file mode 100644 index 0000000000000..ccd73f4be3a96 --- /dev/null +++ b/pkg/logql/shardmapper_test.go @@ -0,0 +1,209 @@ +package logql + +import ( + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/querier/astmapper" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/stretchr/testify/require" +) + +func TestMapSampleExpr(t *testing.T) { + m, err := NewShardMapper(2) + require.Nil(t, err) + + for _, tc := range []struct { + in SampleExpr + out SampleExpr + }{ + { + in: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: time.Minute, + }, + }, + out: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + SampleExpr: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: time.Minute, + }, + }, + }, + next: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + SampleExpr: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: time.Minute, + }, + }, + }, + next: nil, + }, + }, + }, + } { + t.Run(tc.in.String(), func(t *testing.T) { + require.Equal(t, tc.out, m.mapSampleExpr(tc.in)) + }) + + } +} + +func TestMapping(t *testing.T) { + m, err := NewShardMapper(2) + require.Nil(t, err) + + for _, tc := range []struct { + in string + expr Expr + err error + }{ + { + in: `{foo="bar"}`, + expr: &ConcatLogSelectorExpr{ + LogSelectorExpr: DownstreamLogSelectorExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + LogSelectorExpr: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + }, + next: &ConcatLogSelectorExpr{ + LogSelectorExpr: DownstreamLogSelectorExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + LogSelectorExpr: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + }, + next: nil, + }, + }, + }, + { + in: `{foo="bar"} |= "error"`, + expr: &ConcatLogSelectorExpr{ + LogSelectorExpr: DownstreamLogSelectorExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + LogSelectorExpr: &filterExpr{ + match: "error", + ty: labels.MatchEqual, + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + }, + }, + next: &ConcatLogSelectorExpr{ + LogSelectorExpr: DownstreamLogSelectorExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + LogSelectorExpr: &filterExpr{ + match: "error", + ty: labels.MatchEqual, + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + }, + }, + next: nil, + }, + }, + }, + { + in: `rate({foo="bar"}[5m])`, + expr: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + SampleExpr: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + next: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + SampleExpr: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + next: nil, + }, + }, + }, + } { + t.Run(tc.in, func(t *testing.T) { + ast, err := ParseExpr(tc.in) + require.Equal(t, tc.err, err) + + mapped, err := m.Map(ast) + require.Equal(t, tc.err, err) + require.Equal(t, tc.expr, mapped) + }) + } +} From 84d64b9195584b7ed179230781e0f5fd3c6ee68d Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 23 Mar 2020 18:54:25 -0400 Subject: [PATCH 17/21] stronger ast sharding & tests --- pkg/logql/ast.go | 40 +- pkg/logql/sharding.go | 27 ++ pkg/logql/shardmapper.go | 83 +++- pkg/logql/shardmapper_test.go | 690 ++++++++++++++++++++++++++++++++++ 4 files changed, 824 insertions(+), 16 deletions(-) diff --git a/pkg/logql/ast.go b/pkg/logql/ast.go index 91e3c8c5ade60..73b5cea25bbab 100644 --- a/pkg/logql/ast.go +++ b/pkg/logql/ast.go @@ -182,15 +182,18 @@ func addFilterToLogRangeExpr(left *logRange, ty labels.MatchType, match string) } const ( - OpTypeSum = "sum" - OpTypeAvg = "avg" - OpTypeMax = "max" - OpTypeMin = "min" - OpTypeCount = "count" - OpTypeStddev = "stddev" - OpTypeStdvar = "stdvar" - OpTypeBottomK = "bottomk" - OpTypeTopK = "topk" + // vector ops + OpTypeSum = "sum" + OpTypeAvg = "avg" + OpTypeMax = "max" + OpTypeMin = "min" + OpTypeCount = "count" + OpTypeStddev = "stddev" + OpTypeStdvar = "stdvar" + OpTypeBottomK = "bottomk" + OpTypeTopK = "topk" + + // range vector ops OpTypeCountOverTime = "count_over_time" OpTypeRate = "rate" @@ -217,6 +220,8 @@ func IsLogicalBinOp(op string) bool { type SampleExpr interface { // Selector is the LogQL selector to apply when retrieving logs. Selector() LogSelectorExpr + // Operations returns the list of operations used in this SampleExpr + Operations() []string Expr } @@ -244,6 +249,11 @@ func (e *rangeAggregationExpr) String() string { return formatOperation(e.operation, nil, e.left.String()) } +// impl SampleExpr +func (e *rangeAggregationExpr) Operations() []string { + return []string{e.operation} +} + type grouping struct { groups []string without bool @@ -320,6 +330,11 @@ func (e *vectorAggregationExpr) String() string { return formatOperation(e.operation, e.grouping, params...) } +// impl SampleExpr +func (e *vectorAggregationExpr) Operations() []string { + return append(e.left.Operations(), e.operation) +} + type binOpExpr struct { SampleExpr RHS SampleExpr @@ -330,6 +345,12 @@ func (e *binOpExpr) String() string { return fmt.Sprintf("%s %s %s", e.SampleExpr.String(), e.op, e.RHS.String()) } +// impl SampleExpr +func (e *binOpExpr) Operations() []string { + ops := append(e.SampleExpr.Operations(), e.RHS.Operations()...) + return append(ops, e.op) +} + func mustNewBinOpExpr(op string, lhs, rhs Expr) SampleExpr { left, ok := lhs.(SampleExpr) if !ok { @@ -423,6 +444,7 @@ func (e *literalExpr) String() string { // to facilitate sum types. We'll be type switching when evaluating them anyways // and they will only be present in binary operation legs. func (e *literalExpr) Selector() LogSelectorExpr { return e } +func (e *literalExpr) Operations() []string { return nil } func (e *literalExpr) Filter() (LineFilter, error) { return nil, nil } func (e *literalExpr) Matchers() []*labels.Matcher { return nil } diff --git a/pkg/logql/sharding.go b/pkg/logql/sharding.go index 1af369cb7653d..d8ae72ea3e8d6 100644 --- a/pkg/logql/sharding.go +++ b/pkg/logql/sharding.go @@ -2,6 +2,7 @@ package logql import ( "context" + "fmt" "github.com/cortexproject/cortex/pkg/querier/astmapper" "github.com/grafana/loki/pkg/iter" @@ -15,24 +16,50 @@ type DownstreamSampleExpr struct { SampleExpr } +func (d DownstreamSampleExpr) String() string { + return fmt.Sprintf("downstream<%s, shard=%s>", d.SampleExpr.String(), d.shard) +} + // DownstreamLogSelectorExpr is a LogSelectorExpr which signals downstream computation type DownstreamLogSelectorExpr struct { shard *astmapper.ShardAnnotation LogSelectorExpr } +func (d DownstreamLogSelectorExpr) String() string { + return fmt.Sprintf("downstream<%s, shard=%s>", d.LogSelectorExpr.String(), d.shard) +} + // ConcatSampleExpr is an expr for concatenating multiple SampleExpr +// Contract: The embedded SampleExprs within a linked list of ConcatSampleExprs must be of the +// same structure. This makes special implementations of SampleExpr.Associative() unnecessary. type ConcatSampleExpr struct { SampleExpr next *ConcatSampleExpr } +func (c ConcatSampleExpr) String() string { + if c.next == nil { + return c.SampleExpr.String() + } + + return fmt.Sprintf("%s ++ %s", c.SampleExpr.String(), c.next.String()) +} + // ConcatLogSelectorExpr is an expr for concatenating multiple LogSelectorExpr type ConcatLogSelectorExpr struct { LogSelectorExpr next *ConcatLogSelectorExpr } +func (c ConcatLogSelectorExpr) String() string { + if c.next == nil { + return c.LogSelectorExpr.String() + } + + return fmt.Sprintf("%s ++ %s", c.LogSelectorExpr.String(), c.next.String()) +} + // downstreamEvaluator is an evaluator which handles shard aware AST nodes type downstreamEvaluator struct{ Downstreamer } diff --git a/pkg/logql/shardmapper.go b/pkg/logql/shardmapper.go index dbfbd483dcf95..79303365a96d0 100644 --- a/pkg/logql/shardmapper.go +++ b/pkg/logql/shardmapper.go @@ -4,6 +4,8 @@ import ( "fmt" "github.com/cortexproject/cortex/pkg/querier/astmapper" + "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/kit/log/level" ) func NewShardMapper(shards int) (ShardMapper, error) { @@ -32,7 +34,7 @@ func (m ShardMapper) Map(expr Expr) (Expr, error) { if err != nil { return nil, err } - rhsMapped, err := m.Map(e.SampleExpr) + rhsMapped, err := m.Map(e.RHS) if err != nil { return nil, err } @@ -91,13 +93,31 @@ func (m ShardMapper) mapSampleExpr(expr SampleExpr) SampleExpr { // technically, std{dev,var} are also parallelizable if there is no cross-shard merging // in descendent nodes in the AST. This optimization is currently avoided for simplicity. func (m ShardMapper) mapVectorAggregationExpr(expr *vectorAggregationExpr) (SampleExpr, error) { + + // if this AST contains unshardable operations, don't shard this at this level, + // but attempt to shard a child node. + if shardable := isShardable(expr.Operations()); !shardable { + subMapped, err := m.Map(expr.left) + if err != nil { + return nil, err + } + sampleExpr, ok := subMapped.(SampleExpr) + if !ok { + return nil, badASTMapping("SampleExpr", subMapped) + } + + return &vectorAggregationExpr{ + left: sampleExpr, + grouping: expr.grouping, + params: expr.params, + operation: expr.operation, + }, nil + + } + switch expr.operation { - // sum(x) -> sum(sum(x, shard=1) ++ sum(x, shard=2)...) - // max(x) -> max(max(x, shard=1) ++ max(x, shard=2)...) - // min(x) -> min(min(x, shard=1) ++ min(x, shard=2)...) - // topk(x) -> topk(topk(x, shard=1) ++ topk(x, shard=2)...) - // botk(x) -> botk(botk(x, shard=1) ++ botk(x, shard=2)...) - case OpTypeSum, OpTypeMax, OpTypeMin, OpTypeTopK, OpTypeBottomK: + case OpTypeSum: + // sum(x) -> sum(sum(x, shard=1) ++ sum(x, shard=2)...) return &vectorAggregationExpr{ left: m.mapSampleExpr(expr), grouping: expr.grouping, @@ -139,6 +159,12 @@ func (m ShardMapper) mapVectorAggregationExpr(expr *vectorAggregationExpr) (Samp operation: OpTypeSum, }, nil default: + // this should not be reachable. If an operation is shardable it should + // have an optimization listed. + level.Warn(util.Logger).Log( + "msg", "unexpected operation which appears shardable, ignoring", + "operation", expr.operation, + ) return expr, nil } } @@ -153,3 +179,46 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *rangeAggregationExpr) SampleE return expr } } + +// isShardable returns false if any of the listed operation types are not shardable and true otherwise +func isShardable(ops []string) bool { + for _, op := range ops { + if shardable := shardableOps[op]; !shardable { + return false + } + } + return true +} + +// shardableOps lists the operations which may be sharded. +// topk, botk, max, & min all must be concatenated and then evaluated in order to avoid +// potential data loss due to series distribution across shards. +// For example, grouping by `cluster` for a `max` operation may yield +// 2 results on the first shard and 10 results on the second. If we prematurely +// calculated `max`s on each shard, the shard/label combination with `2` may be +// discarded and some other combination with `11` may be reported falsely as the max. +// +// Explanation: this is my (owen-d) best understanding. +// +// For an operation to be shardable, first the sample-operation itself must be associative like (+, *) but not (%, /, ^). +// Secondly, if the operation is part of a vector aggregation expression or utilizes logical/set binary ops, +// the vector operation must be distributive over the sample-operation. +// This ensures that the vector merging operation can be applied repeatedly to data in different shards. +// references: +// https://en.wikipedia.org/wiki/Associative_property +// https://en.wikipedia.org/wiki/Distributive_property +var shardableOps = map[string]bool{ + // vector ops + OpTypeSum: true, + // avg is only marked as shardable because we remap it into sum/count. + OpTypeAvg: true, + OpTypeCount: true, + + // range vector ops + OpTypeCountOverTime: true, + OpTypeRate: true, + + // binops - arith + OpTypeAdd: true, + OpTypeMul: true, +} diff --git a/pkg/logql/shardmapper_test.go b/pkg/logql/shardmapper_test.go index ccd73f4be3a96..9929c9e929d01 100644 --- a/pkg/logql/shardmapper_test.go +++ b/pkg/logql/shardmapper_test.go @@ -9,6 +9,48 @@ import ( "github.com/stretchr/testify/require" ) +func TestStringer(t *testing.T) { + for _, tc := range []struct { + in Expr + out string + }{ + { + in: &ConcatLogSelectorExpr{ + LogSelectorExpr: DownstreamLogSelectorExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + LogSelectorExpr: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + }, + next: &ConcatLogSelectorExpr{ + LogSelectorExpr: DownstreamLogSelectorExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + LogSelectorExpr: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + }, + next: nil, + }, + }, + out: `downstream<{foo="bar"}, shard=0_of_2> ++ downstream<{foo="bar"}, shard=1_of_2>`, + }, + } { + t.Run(tc.out, func(t *testing.T) { + require.Equal(t, tc.out, tc.in.String()) + }) + } +} + func TestMapSampleExpr(t *testing.T) { m, err := NewShardMapper(2) require.Nil(t, err) @@ -196,13 +238,661 @@ func TestMapping(t *testing.T) { }, }, }, + { + in: `count_over_time({foo="bar"}[5m])`, + expr: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + SampleExpr: &rangeAggregationExpr{ + operation: OpTypeCountOverTime, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + next: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + SampleExpr: &rangeAggregationExpr{ + operation: OpTypeCountOverTime, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + next: nil, + }, + }, + }, + { + in: `sum(rate({foo="bar"}[5m]))`, + expr: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeSum, + left: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeSum, + left: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + }, + next: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeSum, + left: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + }, + next: nil, + }, + }, + }, + }, + { + in: `topk(3, rate({foo="bar"}[5m]))`, + expr: &vectorAggregationExpr{ + grouping: &grouping{}, + params: 3, + operation: OpTypeTopK, + left: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + SampleExpr: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + next: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + SampleExpr: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + next: nil, + }, + }, + }, + }, + { + in: `max without (env) (rate({foo="bar"}[5m]))`, + expr: &vectorAggregationExpr{ + grouping: &grouping{ + without: true, + groups: []string{"env"}, + }, + operation: OpTypeMax, + left: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + SampleExpr: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + next: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + SampleExpr: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + next: nil, + }, + }, + }, + }, + { + in: `count(rate({foo="bar"}[5m]))`, + expr: &vectorAggregationExpr{ + operation: OpTypeSum, + grouping: &grouping{}, + left: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeCount, + left: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + }, + next: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeCount, + left: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + }, + next: nil, + }, + }, + }, + }, + { + in: `avg(rate({foo="bar"}[5m]))`, + expr: &binOpExpr{ + op: OpTypeDiv, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeSum, + left: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeSum, + left: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + }, + next: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeSum, + left: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + }, + next: nil, + }, + }, + }, + RHS: &vectorAggregationExpr{ + operation: OpTypeSum, + grouping: &grouping{}, + left: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeCount, + left: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + }, + next: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeCount, + left: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + }, + next: nil, + }, + }, + }, + }, + }, + { + in: `1 + sum by (cluster) (rate({foo="bar"}[5m]))`, + expr: &binOpExpr{ + op: OpTypeAdd, + SampleExpr: &literalExpr{1}, + RHS: &vectorAggregationExpr{ + grouping: &grouping{ + groups: []string{"cluster"}, + }, + operation: OpTypeSum, + left: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{ + groups: []string{"cluster"}, + }, + operation: OpTypeSum, + left: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + }, + next: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{ + groups: []string{"cluster"}, + }, + operation: OpTypeSum, + left: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + }, + next: nil, + }, + }, + }, + }, + }, + // sum(max) should not shard the maxes + { + in: `sum(max(rate({foo="bar"}[5m])))`, + expr: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeSum, + left: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeMax, + left: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + SampleExpr: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + next: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + SampleExpr: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + next: nil, + }, + }, + }, + }, + }, + // max(count) should shard the count, but not the max + { + in: `max(count(rate({foo="bar"}[5m])))`, + expr: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeMax, + left: &vectorAggregationExpr{ + operation: OpTypeSum, + grouping: &grouping{}, + left: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeCount, + left: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + }, + next: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeCount, + left: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + }, + next: nil, + }, + }, + }, + }, + }, + { + in: `max(sum by (cluster) (rate({foo="bar"}[5m]))) / count(rate({foo="bar"}[5m]))`, + expr: &binOpExpr{ + op: OpTypeDiv, + SampleExpr: &vectorAggregationExpr{ + operation: OpTypeMax, + grouping: &grouping{}, + left: &vectorAggregationExpr{ + grouping: &grouping{ + groups: []string{"cluster"}, + }, + operation: OpTypeSum, + left: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{ + groups: []string{"cluster"}, + }, + operation: OpTypeSum, + left: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + }, + next: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{ + groups: []string{"cluster"}, + }, + operation: OpTypeSum, + left: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + }, + next: nil, + }, + }, + }, + }, + RHS: &vectorAggregationExpr{ + operation: OpTypeSum, + grouping: &grouping{}, + left: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeCount, + left: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + }, + next: &ConcatSampleExpr{ + SampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + SampleExpr: &vectorAggregationExpr{ + grouping: &grouping{}, + operation: OpTypeCount, + left: &rangeAggregationExpr{ + operation: OpTypeRate, + left: &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, + }, + }, + }, + next: nil, + }, + }, + }, + }, + }, } { t.Run(tc.in, func(t *testing.T) { ast, err := ParseExpr(tc.in) require.Equal(t, tc.err, err) mapped, err := m.Map(ast) + require.Equal(t, tc.err, err) + require.Equal(t, tc.expr.String(), mapped.String()) require.Equal(t, tc.expr, mapped) }) } From 0f234cfea0db9ba8760f42e23c6da646000a620d Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Tue, 24 Mar 2020 10:01:06 -0400 Subject: [PATCH 18/21] shardmapper tests for string->string --- pkg/logql/shardmapper_test.go | 45 +++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/pkg/logql/shardmapper_test.go b/pkg/logql/shardmapper_test.go index 9929c9e929d01..f02107b7adfe0 100644 --- a/pkg/logql/shardmapper_test.go +++ b/pkg/logql/shardmapper_test.go @@ -119,6 +119,51 @@ func TestMapSampleExpr(t *testing.T) { } } +func TestMappingStrings(t *testing.T) { + m, err := NewShardMapper(2) + require.Nil(t, err) + for _, tc := range []struct { + in string + out string + }{ + { + in: `sum(rate({foo="bar"}[1m]))`, + out: `sum(downstream ++ downstream)`, + }, + { + in: `max(count(rate({foo="bar"}[5m]))) / 2`, + out: `max(sum(downstream ++ downstream)) / 2.000000`, + }, + { + in: `topk(3, rate({foo="bar"}[5m]))`, + out: `topk(3,downstream ++ downstream)`, + }, + { + in: `sum(max(rate({foo="bar"}[5m])))`, + out: `sum(max(downstream ++ downstream))`, + }, + { + in: `{foo="bar"} |= "id=123"`, + out: `downstream<{foo="bar"}|="id=123", shard=0_of_2> ++ downstream<{foo="bar"}|="id=123", shard=1_of_2>`, + }, + { + in: `sum by (cluster) (rate({foo="bar"} |= "id=123" [5m]))`, + out: `sum by(cluster)(downstream ++ downstream)`, + }, + } { + t.Run(tc.in, func(t *testing.T) { + ast, err := ParseExpr(tc.in) + require.Nil(t, err) + + mapped, err := m.Map(ast) + require.Nil(t, err) + + require.Equal(t, tc.out, mapped.String()) + + }) + } +} + func TestMapping(t *testing.T) { m, err := NewShardMapper(2) require.Nil(t, err) From 3743f37b2366d6bc88a5b254025d1dba47db07d8 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Tue, 24 Mar 2020 10:22:55 -0400 Subject: [PATCH 19/21] removes sharding evaluator code --- pkg/logql/chainedevaluator.go | 53 ------------------- pkg/logql/downstreamer.go | 7 --- pkg/logql/sharding.go | 97 ----------------------------------- 3 files changed, 157 deletions(-) delete mode 100644 pkg/logql/chainedevaluator.go delete mode 100644 pkg/logql/downstreamer.go diff --git a/pkg/logql/chainedevaluator.go b/pkg/logql/chainedevaluator.go deleted file mode 100644 index 346dd6992cb4f..0000000000000 --- a/pkg/logql/chainedevaluator.go +++ /dev/null @@ -1,53 +0,0 @@ -package logql - -import ( - "context" - - "github.com/grafana/loki/pkg/iter" - "github.com/pkg/errors" -) - -// ChainedEvaluator is an evaluator which chains multiple other evaluators, -// deferring to the first successful one. -type ChainedEvaluator struct { - evaluators []Evaluator -} - -// StepEvaluator attempts the embedded evaluators until one succeeds or they all error. -func (c *ChainedEvaluator) StepEvaluator( - ctx context.Context, - nextEvaluator Evaluator, - expr SampleExpr, - p Params, -) (stepper StepEvaluator, err error) { - for _, eval := range c.evaluators { - if stepper, err = eval.StepEvaluator(ctx, nextEvaluator, expr, p); err == nil { - return stepper, nil - } - } - return nil, err -} - -// Iterator attempts the embedded evaluators until one succeeds or they all error. -func (c *ChainedEvaluator) Iterator( - ctx context.Context, - expr LogSelectorExpr, - p Params, -) (iterator iter.EntryIterator, err error) { - for _, eval := range c.evaluators { - if iterator, err = eval.Iterator(ctx, expr, p); err == nil { - return iterator, nil - } - } - return nil, err -} - -// NewChainedEvaluator constructs a ChainedEvaluator from one or more Evaluators -func NewChainedEvaluator(evals ...Evaluator) (*ChainedEvaluator, error) { - if len(evals) == 0 { - return nil, errors.New("must supply an Evaluator") - } - return &ChainedEvaluator{ - evaluators: evals, - }, nil -} diff --git a/pkg/logql/downstreamer.go b/pkg/logql/downstreamer.go deleted file mode 100644 index 96fdef69dc919..0000000000000 --- a/pkg/logql/downstreamer.go +++ /dev/null @@ -1,7 +0,0 @@ -package logql - -// Downstreamer is an interface for deferring responsibility for query execution. -// It is decoupled from but consumed by a downStreamEvaluator to dispatch ASTs. -type Downstreamer interface { - // Downstream(*LokiRequest) (*LokiResponse, error) -} diff --git a/pkg/logql/sharding.go b/pkg/logql/sharding.go index d8ae72ea3e8d6..5141a160d3a36 100644 --- a/pkg/logql/sharding.go +++ b/pkg/logql/sharding.go @@ -1,13 +1,9 @@ package logql import ( - "context" "fmt" "github.com/cortexproject/cortex/pkg/querier/astmapper" - "github.com/grafana/loki/pkg/iter" - "github.com/pkg/errors" - "github.com/prometheus/prometheus/promql" ) // DownstreamSampleExpr is a SampleExpr which signals downstream computation @@ -59,96 +55,3 @@ func (c ConcatLogSelectorExpr) String() string { return fmt.Sprintf("%s ++ %s", c.LogSelectorExpr.String(), c.next.String()) } - -// downstreamEvaluator is an evaluator which handles shard aware AST nodes -type downstreamEvaluator struct{ Downstreamer } - -// Evaluator returns a StepEvaluator for a given SampleExpr -func (ev *downstreamEvaluator) StepEvaluator( - ctx context.Context, - nextEv Evaluator, - expr SampleExpr, - params Params, -) (StepEvaluator, error) { - switch e := expr.(type) { - case DownstreamSampleExpr: - // downstream to a querier - return nil, errors.New("unimplemented") - - case ConcatSampleExpr: - // ensure they all impl the same (SampleExpr, LogSelectorExpr) & concat - var xs []StepEvaluator - cur := &e - - for cur != nil { - eval, err := ev.StepEvaluator(ctx, nextEv, cur.SampleExpr, params) - if err != nil { - // Close previously opened StepEvaluators - for _, x := range xs { - x.Close() - } - return nil, err - } - xs = append(xs, eval) - cur = cur.next - } - - return ConcatEvaluator(xs) - - default: - return nil, EvaluatorUnsupportedType(expr, ev) - } -} - -// Iterator returns the iter.EntryIterator for a given LogSelectorExpr -func (ev *downstreamEvaluator) Iterator( - ctx context.Context, - expr LogSelectorExpr, - params Params, -) (iter.EntryIterator, error) { - switch e := expr.(type) { - case DownstreamLogSelectorExpr: - // downstream to a querier - return nil, errors.New("unimplemented") - case ConcatLogSelectorExpr: - var iters []iter.EntryIterator - cur := &e - for cur != nil { - iterator, err := ev.Iterator(ctx, e, params) - if err != nil { - // Close previously opened StepEvaluators - for _, x := range iters { - x.Close() - } - return nil, err - } - iters = append(iters, iterator) - } - return iter.NewHeapIterator(ctx, iters, params.Direction()), nil - } - return nil, errors.New("unimplemented") -} - -// ConcatEvaluator joins multiple StepEvaluators. -// Contract: They must be of identical start, end, and step values. -func ConcatEvaluator(evaluators []StepEvaluator) (StepEvaluator, error) { - return newStepEvaluator( - func() (done bool, ts int64, vec promql.Vector) { - var cur promql.Vector - for _, eval := range evaluators { - done, ts, cur = eval.Next() - vec = append(vec, cur...) - } - return done, ts, vec - - }, - func() (lastErr error) { - for _, eval := range evaluators { - if err := eval.Close(); err != nil { - lastErr = err - } - } - return lastErr - }, - ) -} From 53563610251bb1cb2251216405d68c7e9166d21e Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Tue, 24 Mar 2020 10:37:35 -0400 Subject: [PATCH 20/21] removes unused ctx arg --- pkg/logql/evaluator.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/logql/evaluator.go b/pkg/logql/evaluator.go index f0e39d5238866..01b2e35f65c80 100644 --- a/pkg/logql/evaluator.go +++ b/pkg/logql/evaluator.go @@ -127,7 +127,7 @@ func (ev *defaultEvaluator) StepEvaluator( if err != nil { return nil, err } - return rangeAggEvaluator(ctx, entryIter, e, q) + return rangeAggEvaluator(entryIter, e, q) case *binOpExpr: return binOpStepEvaluator(ctx, nextEv, e, q) default: @@ -331,7 +331,6 @@ func vectorAggEvaluator( } func rangeAggEvaluator( - ctx context.Context, entryIter iter.EntryIterator, expr *rangeAggregationExpr, q Params, From eee8798ee0b2ebf9f329fdeb80343c9f45718cd3 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 26 Mar 2020 13:27:45 -0400 Subject: [PATCH 21/21] Update pkg/logql/evaluator.go Co-Authored-By: Cyril Tovena --- pkg/logql/evaluator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/logql/evaluator.go b/pkg/logql/evaluator.go index 01b2e35f65c80..85659db9bef89 100644 --- a/pkg/logql/evaluator.go +++ b/pkg/logql/evaluator.go @@ -67,7 +67,7 @@ func GetRangeType(q Params) QueryRangeType { return RangeType } -// StepEvaluator is an interface for iterating over data at different nodes in the AST +// Evaluator is an interface for iterating over data at different nodes in the AST type Evaluator interface { // StepEvaluator returns a StepEvaluator for a given SampleExpr. It's explicitly passed another StepEvaluator// in order to enable arbitrary compuation of embedded expressions. This allows more modular & extensible // StepEvaluator implementations which can be composed.