Skip to content

Commit

Permalink
fix(engine): Fix Walk() function implementation on various Expr i…
Browse files Browse the repository at this point in the history
…mplementations (#16033)

The `Walk(f WalkFn)` implementation expects to first visit the current node and then invoke `Walk(f)` on all its children if they are not `nil`.

This also fixes the `checkIntervalLimit(syntax.SampleExpr, time.Duration)` function, which did not visit the expression if it was a `*ConcatSampleExpr`.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum authored Feb 4, 2025
1 parent 3a02d64 commit 3888866
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 40 deletions.
33 changes: 30 additions & 3 deletions pkg/logql/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,12 @@ func (d DownstreamLogSelectorExpr) Pretty(level int) string {
return s
}

func (d DownstreamSampleExpr) Walk(f syntax.WalkFn) { f(d) }
func (d DownstreamSampleExpr) Walk(f syntax.WalkFn) {
f(d)
if d.SampleExpr != nil {
d.SampleExpr.Walk(f)
}
}

var defaultMaxDepth = 4

Expand Down Expand Up @@ -173,7 +178,12 @@ func (c *ConcatSampleExpr) string(maxDepth int) string {

func (c *ConcatSampleExpr) Walk(f syntax.WalkFn) {
f(c)
f(c.next)
if c.SampleExpr != nil {
c.SampleExpr.Walk(f)
}
if c.next != nil {
c.next.Walk(f)
}
}

// ConcatSampleExpr has no LogQL repretenstation. It is expressed in in the
Expand Down Expand Up @@ -271,7 +281,12 @@ func (e QuantileSketchEvalExpr) String() string {

func (e *QuantileSketchEvalExpr) Walk(f syntax.WalkFn) {
f(e)
e.quantileMergeExpr.Walk(f)
if e.SampleExpr != nil {
e.SampleExpr.Walk(f)
}
if e.quantileMergeExpr != nil {
e.quantileMergeExpr.Walk(f)
}
}

type QuantileSketchMergeExpr struct {
Expand All @@ -297,6 +312,9 @@ func (e QuantileSketchMergeExpr) String() string {

func (e *QuantileSketchMergeExpr) Walk(f syntax.WalkFn) {
f(e)
if e.SampleExpr != nil {
e.SampleExpr.Walk(f)
}
for _, d := range e.downstreams {
d.Walk(f)
}
Expand Down Expand Up @@ -326,6 +344,9 @@ func (e MergeFirstOverTimeExpr) String() string {

func (e *MergeFirstOverTimeExpr) Walk(f syntax.WalkFn) {
f(e)
if e.SampleExpr != nil {
e.SampleExpr.Walk(f)
}
for _, d := range e.downstreams {
d.Walk(f)
}
Expand Down Expand Up @@ -355,6 +376,9 @@ func (e MergeLastOverTimeExpr) String() string {

func (e *MergeLastOverTimeExpr) Walk(f syntax.WalkFn) {
f(e)
if e.SampleExpr != nil {
e.SampleExpr.Walk(f)
}
for _, d := range e.downstreams {
d.Walk(f)
}
Expand Down Expand Up @@ -383,6 +407,9 @@ func (e CountMinSketchEvalExpr) String() string {

func (e *CountMinSketchEvalExpr) Walk(f syntax.WalkFn) {
f(e)
if e.SampleExpr != nil {
e.SampleExpr.Walk(f)
}
for _, d := range e.downstreams {
d.Walk(f)
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,11 +489,10 @@ func (q *query) checkIntervalLimit(expr syntax.SampleExpr, limit time.Duration)
var err error
expr.Walk(func(e syntax.Expr) {
switch e := e.(type) {
case *syntax.RangeAggregationExpr:
if e.Left == nil || e.Left.Interval <= limit {
return
case *syntax.LogRange:
if e.Interval > limit {
err = fmt.Errorf("%w: [%s] > [%s]", logqlmodel.ErrIntervalLimit, model.Duration(e.Interval), model.Duration(limit))
}
err = fmt.Errorf("%w: [%s] > [%s]", logqlmodel.ErrIntervalLimit, model.Duration(e.Left.Interval), model.Duration(limit))
}
})
return err
Expand Down
39 changes: 39 additions & 0 deletions pkg/logql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,45 @@ var (
ErrMockMultiple = util.MultiError{ErrMock, ErrMock}
)

func TestEngine_checkIntervalLimit(t *testing.T) {
q := &query{}
for _, tc := range []struct {
query string
expErr string
}{
{query: `rate({app="foo"} [1m])`, expErr: ""},
{query: `rate({app="foo"} [10m])`, expErr: ""},
{query: `max(rate({app="foo"} [5m])) - max(rate({app="bar"} [10m]))`, expErr: ""},
{query: `rate({app="foo"} [5m]) - rate({app="bar"} [15m])`, expErr: "[15m] > [10m]"},
{query: `rate({app="foo"} [1h])`, expErr: "[1h] > [10m]"},
{query: `sum(rate({app="foo"} [1h]))`, expErr: "[1h] > [10m]"},
{query: `sum_over_time({app="foo"} |= "foo" | json | unwrap bar [1h])`, expErr: "[1h] > [10m]"},
} {
for _, downstream := range []bool{true, false} {
t.Run(fmt.Sprintf("%v/downstream=%v", tc.query, downstream), func(t *testing.T) {
expr := syntax.MustParseExpr(tc.query).(syntax.SampleExpr)
if downstream {
// Simulate downstream expression
expr = &ConcatSampleExpr{
DownstreamSampleExpr: DownstreamSampleExpr{
shard: nil,
SampleExpr: expr,
},
next: nil,
}
}
err := q.checkIntervalLimit(expr, 10*time.Minute)
if tc.expErr != "" {
require.ErrorContains(t, err, tc.expErr)
} else {
require.NoError(t, err)
}
})
}

}
}

func TestEngine_LogsRateUnwrap(t *testing.T) {
t.Parallel()
for _, test := range []struct {
Expand Down
46 changes: 23 additions & 23 deletions pkg/logql/syntax/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,16 +341,12 @@ func (e *PipelineExpr) Shardable(topLevel bool) bool {
func (e *PipelineExpr) Walk(f WalkFn) {
f(e)

if e.Left == nil {
return
if e.Left != nil {
e.Left.Walk(f)
}

xs := make([]Walkable, 0, len(e.MultiStages)+1)
xs = append(xs, e.Left)
for _, p := range e.MultiStages {
xs = append(xs, p)
p.Walk(f)
}
walkAll(f, xs...)
}

func (e *PipelineExpr) Accept(v RootVisitor) { v.VisitPipeline(e) }
Expand Down Expand Up @@ -501,10 +497,12 @@ func (*LineFilterExpr) isStageExpr() {}

func (e *LineFilterExpr) Walk(f WalkFn) {
f(e)
if e.Left == nil {
return
if e.Left != nil {
e.Left.Walk(f)
}
if e.Or != nil {
e.Or.Walk(f)
}
e.Left.Walk(f)
}

func (e *LineFilterExpr) Accept(v RootVisitor) {
Expand Down Expand Up @@ -1153,10 +1151,9 @@ func (r *LogRange) Shardable(topLevel bool) bool { return r.Left.Shardable(topLe

func (r *LogRange) Walk(f WalkFn) {
f(r)
if r.Left == nil {
return
if r.Left != nil {
r.Left.Walk(f)
}
r.Left.Walk(f)
}

func (r *LogRange) Accept(v RootVisitor) {
Expand Down Expand Up @@ -1476,10 +1473,9 @@ func (e *RangeAggregationExpr) Shardable(topLevel bool) bool {

func (e *RangeAggregationExpr) Walk(f WalkFn) {
f(e)
if e.Left == nil {
return
if e.Left != nil {
e.Left.Walk(f)
}
e.Left.Walk(f)
}

func (e *RangeAggregationExpr) Accept(v RootVisitor) { v.VisitRangeAggregation(e) }
Expand Down Expand Up @@ -1686,10 +1682,9 @@ func (e *VectorAggregationExpr) Shardable(topLevel bool) bool {

func (e *VectorAggregationExpr) Walk(f WalkFn) {
f(e)
if e.Left == nil {
return
if e.Left != nil {
e.Left.Walk(f)
}
e.Left.Walk(f)
}

func (e *VectorAggregationExpr) Accept(v RootVisitor) { v.VisitVectorAggregation(e) }
Expand Down Expand Up @@ -1806,7 +1801,13 @@ func (e *BinOpExpr) Shardable(topLevel bool) bool {
}

func (e *BinOpExpr) Walk(f WalkFn) {
walkAll(f, e.SampleExpr, e.RHS)
f(e)
if e.SampleExpr != nil {
e.SampleExpr.Walk(f)
}
if e.RHS != nil {
e.RHS.Walk(f)
}
}

func (e *BinOpExpr) Accept(v RootVisitor) { v.VisitBinOp(e) }
Expand Down Expand Up @@ -2235,10 +2236,9 @@ func (e *LabelReplaceExpr) Shardable(_ bool) bool {

func (e *LabelReplaceExpr) Walk(f WalkFn) {
f(e)
if e.Left == nil {
return
if e.Left != nil {
e.Left.Walk(f)
}
e.Left.Walk(f)
}

func (e *LabelReplaceExpr) Accept(v RootVisitor) { v.VisitLabelReplace(e) }
Expand Down
6 changes: 0 additions & 6 deletions pkg/logql/syntax/walk.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,6 @@ package syntax

type WalkFn = func(e Expr)

func walkAll(f WalkFn, xs ...Walkable) {
for _, x := range xs {
x.Walk(f)
}
}

type Walkable interface {
Walk(f WalkFn)
}
4 changes: 1 addition & 3 deletions pkg/logql/syntax/walk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func Test_Walkable(t *testing.T) {
{
desc: "bin op query",
expr: `(sum by(cluster)(rate({job="foo"} |= "bar" | logfmt | bazz="buzz"[5m])) / sum by(cluster)(rate({job="foo"} |= "bar" | logfmt | bazz="buzz"[5m])))`,
want: 16,
want: 17,
},
}
for _, test := range tests {
Expand Down Expand Up @@ -79,8 +79,6 @@ func Test_AppendMatchers(t *testing.T) {
switch me := e.(type) {

Check failure on line 79 in pkg/logql/syntax/walk_test.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

exhaustiveness check failed for sum type "Expr" (from pkg/logql/syntax/ast.go:27:6): missing cases for BinOpExpr, DecolorizeExpr, DropLabelsExpr, JSONExpressionParser, KeepLabelsExpr, LabelFilterExpr, LabelFmtExpr, LabelParserExpr, LabelReplaceExpr, LineFilterExpr, LineFmtExpr, LiteralExpr, LogRange, LogSelectorExpr, LogfmtExpressionParser, LogfmtParserExpr, PipelineExpr, RangeAggregationExpr, SampleExpr, StageExpr, VectorAggregationExpr, VectorExpr (gochecksumtype)

Check failure on line 79 in pkg/logql/syntax/walk_test.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

exhaustiveness check failed for sum type "Expr" (from pkg/logql/syntax/ast.go:27:6): missing cases for BinOpExpr, DecolorizeExpr, DropLabelsExpr, JSONExpressionParser, KeepLabelsExpr, LabelFilterExpr, LabelFmtExpr, LabelParserExpr, LabelReplaceExpr, LineFilterExpr, LineFmtExpr, LiteralExpr, LogRange, LogSelectorExpr, LogfmtExpressionParser, LogfmtParserExpr, PipelineExpr, RangeAggregationExpr, SampleExpr, StageExpr, VectorAggregationExpr, VectorExpr (gochecksumtype)
case *MatchersExpr:
me.AppendMatchers(test.matchers)
default:
// Do nothing
}
})
require.Equal(t, test.want, expr.String())
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/queryrange/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1464,7 +1464,7 @@ func (f fakeLimits) MaxQueryLength(context.Context, string) time.Duration {
}

func (f fakeLimits) MaxQueryRange(context.Context, string) time.Duration {
return time.Second
return time.Hour
}

func (f fakeLimits) MaxQueryParallelism(context.Context, string) int {
Expand Down

0 comments on commit 3888866

Please sign in to comment.