Skip to content

Commit

Permalink
fix: fix aggregate window rules that left plan in bad state (#5076)
Browse files Browse the repository at this point in the history
  • Loading branch information
Christopher M. Wolff authored Aug 11, 2022
1 parent 837f9e8 commit 35a1a95
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 60 deletions.
44 changes: 22 additions & 22 deletions lang/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,22 +332,22 @@ guild = data
experimental.chain(first: id, second: guild)
`,
want: `[digraph {
array.from0
range1
filter2
"array.from0"
"range1"
"filter2"
// r._field == "id"
array.from0 -> range1
range1 -> filter2
"array.from0" -> "range1"
"range1" -> "filter2"
}
digraph {
array.from3
range4
filter5
"array.from3"
"range4"
"filter5"
// r._field == "guild"
array.from3 -> range4
range4 -> filter5
"array.from3" -> "range4"
"range4" -> "filter5"
}
]`,
},
Expand All @@ -368,24 +368,24 @@ data
|> filter(fn: (r) => r["_field"] == id)
`,
want: `[digraph {
array.from0
range1
filter2
"array.from0"
"range1"
"filter2"
// r._field == "id"
sort3
"sort3"
array.from0 -> range1
range1 -> filter2
filter2 -> sort3
"array.from0" -> "range1"
"range1" -> "filter2"
"filter2" -> "sort3"
}
digraph {
array.from4
range5
filter6
"array.from4"
"range5"
"filter6"
// r._field == "id"
array.from4 -> range5
range5 -> filter6
"array.from4" -> "range5"
"range5" -> "filter6"
}
]`,
},
Expand Down
2 changes: 1 addition & 1 deletion libflux/go/libflux/buildinfo.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ var sourceHashes = map[string]string{
"stdlib/universe/aggregate_window_max_test.flux": "8ee5d927ef375e7ac3687ea1ea00a0e8add969c31357362d0032ee079c2ac906",
"stdlib/universe/aggregate_window_mean_test.flux": "ba33848419748489bc330c67dfaa344fac757f8af782879c71e08b345f0f652f",
"stdlib/universe/aggregate_window_median_test.flux": "2791ed98310aae23deab7cb79836e60539cd29810c67910c30e4b891356a2698",
"stdlib/universe/aggregate_window_test.flux": "79ae9443c57f613fa7917b727f80f6a3fc985fcd95006ab2804c1f5d25c9b9d3",
"stdlib/universe/aggregate_window_test.flux": "d415e0613744096fd7f624bdef2faf1fdae480b83feee2298b672385de14a8c8",
"stdlib/universe/cmo_test.flux": "3793f5cf21ae42879d6789cdc36bbcf29ddb99bf16af8a96775a9415fd8d1c09",
"stdlib/universe/columns_test.flux": "7d116d2f875b7bc1d6ec47b61b3166abb91759788dee6032ad6d86874a58abed",
"stdlib/universe/contains_test.flux": "a1890287ebc1ad6d8c84561f7e0cb725048a1e287f49f5dceac196866a20ee01",
Expand Down
10 changes: 8 additions & 2 deletions plan/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ type formatter struct {
p *Spec
}

func formatAsDOT(id NodeID) string {
// The DOT language does not allow "." or "/" in IDs
// so quote the node IDs.
return fmt.Sprintf("%q", id)
}

func (f formatter) Format(fs fmt.State, c rune) {
// Panicking while producing debug output is frustrating, so catch any panics and
// continue if that happens.
Expand All @@ -54,7 +60,7 @@ func (f formatter) Format(fs fmt.State, c rune) {
_, _ = fmt.Fprintf(fs, "digraph {\n")
var edges []string
_ = f.p.BottomUpWalk(func(pn Node) error {
_, _ = fmt.Fprintf(fs, " %v\n", pn.ID())
_, _ = fmt.Fprintf(fs, " %v\n", formatAsDOT(pn.ID()))
if f.withDetails {
details := ""
if d, ok := pn.ProcedureSpec().(Detailer); ok {
Expand All @@ -78,7 +84,7 @@ func (f formatter) Format(fs fmt.State, c rune) {
}
}
for _, pred := range pn.Predecessors() {
edges = append(edges, fmt.Sprintf(" %v -> %v", pred.ID(), pn.ID()))
edges = append(edges, fmt.Sprintf(" %v -> %v", formatAsDOT(pred.ID()), formatAsDOT(pn.ID())))
}
return nil
})
Expand Down
12 changes: 6 additions & 6 deletions plan/format_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ func TestFormatted(t *testing.T) {
},
},
want: `digraph {
from
filter
"from"
"filter"
// r._value > 5.000000
from -> filter
"from" -> "filter"
}
`,
},
Expand Down Expand Up @@ -83,12 +83,12 @@ func TestFormatted(t *testing.T) {
},
},
want: `digraph {
source
merge
"source"
"merge"
// *** spec details ***
// ParallelMergeFactor: 8
source -> merge
"source" -> "merge"
}
`,
},
Expand Down
42 changes: 26 additions & 16 deletions plan/heuristic_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package plan

import (
"context"
"fmt"
"sort"

"github.com/influxdata/flux/codes"
Expand Down Expand Up @@ -48,10 +49,20 @@ func applyRule(ctx context.Context, spec *Spec, rule Rule, node Node) (Node, boo
return nil, false, err
} else if changed {
if newNode == nil {
return nil, false, errors.Newf(codes.Internal, "rule %q returned a nil plan node even though it seems to have changed the plan", rule.Name())
return nil, false, errors.Newf(
codes.Internal,
"rule %q returned a nil plan node even though it seems to have changed the plan",
rule.Name(),
)
}
testing.MarkInvokedPlannerRule(ctx, rule.Name())
updateSuccessors(spec, node, newNode)
if err := updateSuccessors(spec, node, newNode); err != nil {
return node, false, errors.Wrap(
err,
codes.Internal,
fmt.Sprintf("updating successors after applying rule %q", rule.Name()),
)
}
return newNode, true, nil
}

Expand Down Expand Up @@ -158,33 +169,32 @@ func (p *heuristicPlanner) Plan(ctx context.Context, inputPlan *Spec) (*Spec, er
// node becomes newNode
// / \ / \
// D E D' E' <-- predecessors
func updateSuccessors(plan *Spec, oldNode, newNode Node) {
func updateSuccessors(plan *Spec, oldNode, newNode Node) error {
// no need to update successors if the node hasn't actually changed
if oldNode == newNode {
return
return nil
}

newNode.ClearSuccessors()

if len(oldNode.Successors()) == 0 {
// This is a new root node.
// This is a new root (sink) node.
plan.Replace(oldNode, newNode)
return
return nil
}

for _, succ := range oldNode.Successors() {
found := false
for i, succPred := range succ.Predecessors() {
if succPred == oldNode {
found = true
succ.Predecessors()[i] = newNode
}
}

if !found {
panic("Inconsistent plan graph: successor does not have edge back to predecessor")
i := IndexOfNode(oldNode, succ.Predecessors())
if i < 0 {
return errors.Newf(
codes.Internal,
"inconsistent plan graph; successor %q does not have edge back to predecessor %q",
succ.ID(), oldNode.ID(),
)
}
succ.Predecessors()[i] = newNode
}

newNode.AddSuccessors(oldNode.Successors()...)
return nil
}
17 changes: 17 additions & 0 deletions plan/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,23 @@ func ReplaceNode(oldNode, newNode Node) {
oldNode.ClearPredecessors()
}

// ReplacePhysicalNodes accepts a connected group of nodes that has a single output and
// a single input, and replaces them with a single node with the predecessors of the old input node.
// Note that the planner has a convention of connecting successors itself
// (rather than having the rules doing it) so the old output's successors
// remain unconnected.
func ReplacePhysicalNodes(ctx context.Context, oldOutputNode, oldInputNode Node, name string, newSpec PhysicalProcedureSpec) Node {
newNode := CreateUniquePhysicalNode(ctx, name, newSpec)

newNode.AddPredecessors(oldInputNode.Predecessors()...)
for _, pred := range oldInputNode.Predecessors() {
i := IndexOfNode(oldInputNode, pred.Successors())
pred.Successors()[i] = newNode
}

return newNode
}

type WindowSpec struct {
Every flux.Duration
Period flux.Duration
Expand Down
19 changes: 6 additions & 13 deletions stdlib/universe/aggregate_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,18 +872,14 @@ func (a AggregateWindowRule) Rewrite(ctx context.Context, node plan.Node) (plan.
if !a.isValidWindowSpec(windowSpec) {
return node, false, nil
}
parentNode := windowNode.Predecessors()[0]

parentNode.ClearSuccessors()
newNode := plan.CreateUniquePhysicalNode(ctx, "aggregateWindow", &AggregateWindowProcedureSpec{
newSpec := &AggregateWindowProcedureSpec{
WindowSpec: windowSpec,
AggregateKind: aggregateNode.Kind(),
ValueCol: valueCol,
UseStart: useStart,
ForceAggregate: false,
})
parentNode.AddSuccessors(newNode)
newNode.AddPredecessors(parentNode)
}
newNode := plan.ReplacePhysicalNodes(ctx, node, windowNode, "aggregateWindow", newSpec)
return newNode, true, nil
}

Expand Down Expand Up @@ -995,17 +991,14 @@ func (a AggregateWindowCreateEmptyRule) Rewrite(ctx context.Context, node plan.N
if !a.isValidWindowSpec(windowSpec) {
return node, false, nil
}
parentNode := windowNode.Predecessors()[0]

parentNode.ClearSuccessors()
newNode := plan.CreateUniquePhysicalNode(ctx, "aggregateWindow", &AggregateWindowProcedureSpec{
newSpec := &AggregateWindowProcedureSpec{
WindowSpec: windowSpec,
AggregateKind: aggregateNode.Kind(),
ValueCol: valueCol,
UseStart: useStart,
ForceAggregate: true,
})
parentNode.AddSuccessors(newNode)
newNode.AddPredecessors(parentNode)
}
newNode := plan.ReplacePhysicalNodes(ctx, node, windowNode, "aggregateWindow", newSpec)
return newNode, true, nil
}
64 changes: 64 additions & 0 deletions stdlib/universe/aggregate_window_test.flux
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package universe_test

import "array"
import "csv"
import "internal/debug"
import "testing"
import "testing/expect"
import "planner"

sampleData = [
Expand Down Expand Up @@ -520,3 +522,65 @@ testcase max_null_windows {

testing.diff(got, want)
}

testcase aggregate_window_create_empty_predecessor_multi_successor {
expect.planner(rules: ["AggregateWindowCreateEmptyRule": 1])

data =
array.from(rows: sampleData)
|> range(start: 2019-11-25T00:00:00Z, stop: 2019-11-25T00:01:00Z)
|> group(columns: ["t0", "_start", "_stop"])

// This additional successor to the input to aggregateWindow caused trouble
// for the planner, so we are just verifying that the rule is applied
// and the query succeeds.
data
|> debug.sink()

got =
data
|> aggregateWindow(fn: count, every: 1m)
|> drop(columns: ["_start", "_stop"])
want =
array.from(
rows: [
{_time: 2019-11-25T00:01:00Z, t0: "a-0", _value: 5},
{_time: 2019-11-25T00:01:00Z, t0: "a-1", _value: 5},
{_time: 2019-11-25T00:01:00Z, t0: "a-2", _value: 5},
],
)
|> group(columns: ["t0", "_start", "_stop"])

testing.diff(want, got)
}

testcase aggregate_window_predecessor_multi_successor {
expect.planner(rules: ["AggregateWindowRule": 1])

data =
array.from(rows: sampleData)
|> range(start: 2019-11-25T00:00:00Z, stop: 2019-11-25T00:01:00Z)
|> group(columns: ["t0", "_start", "_stop"])

// This additional successor to the input to aggregateWindow caused trouble
// for the planner, so we are just verifying that the rule is applied
// and the query succeeds.
data
|> debug.sink()

got =
data
|> aggregateWindow(fn: count, every: 1m, createEmpty: false)
|> drop(columns: ["_start", "_stop"])
want =
array.from(
rows: [
{_time: 2019-11-25T00:01:00Z, t0: "a-0", _value: 5},
{_time: 2019-11-25T00:01:00Z, t0: "a-1", _value: 5},
{_time: 2019-11-25T00:01:00Z, t0: "a-2", _value: 5},
],
)
|> group(columns: ["t0", "_start", "_stop"])

testing.diff(want, got)
}

0 comments on commit 35a1a95

Please sign in to comment.