Skip to content

Commit

Permalink
feat(plan): add a rule to remove redundant sort nodes (#5007)
Browse files Browse the repository at this point in the history
This code adds a planner rule behind a feature flag that will remove redundant
sort nodes from the query plan.
  • Loading branch information
Christopher M. Wolff authored Jul 27, 2022
1 parent cad109b commit 2914ff7
Show file tree
Hide file tree
Showing 17 changed files with 409 additions and 38 deletions.
11 changes: 11 additions & 0 deletions execute/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,17 @@ func (c *SimpleAggregateConfig) ReadArgs(args flux.Arguments) error {
return nil
}

// PassThroughAttribute implements the PassThroughAttributer interface used by
// the planner. Aggregate functions preserve collation of their input rows,
// albeit trivially, since there can be only one row in each output table.
func (c SimpleAggregateConfig) PassThroughAttribute(attrKey string) bool {
switch attrKey {
case plan.CollationKey:
return true
}
return false
}

func NewSimpleAggregateTransformation(ctx context.Context, id DatasetID, agg SimpleAggregate, config SimpleAggregateConfig, mem memory.Allocator) (Transformation, Dataset, error) {
if feature.AggregateTransformationTransport().Enabled(ctx) {
tr := &simpleAggregateTransformation2{
Expand Down
1 change: 1 addition & 0 deletions execute/executetest/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var testFlags = map[string]interface{}{
"optimizeStateTracking": true,
"optimizeSetTransformation": true,
"experimentalTestingDiff": true,
"removeRedundantSortNodes": true,
}

type TestFlagger map[string]interface{}
Expand Down
10 changes: 10 additions & 0 deletions execute/selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ type indexSelectorTransformation struct {
selector IndexSelector
}

// PassThroughAttribute implements the PassThroughAttributer interface used by
// the planner. Selector functions preserve collation of their input rows.
func (c SelectorConfig) PassThroughAttribute(attrKey string) bool {
switch attrKey {
case plan.CollationKey:
return true
}
return false
}

func NewRowSelectorTransformationAndDataset(id DatasetID, mode AccumulationMode, selector RowSelector, config SelectorConfig, a memory.Allocator) (*rowSelectorTransformation, Dataset) {
cache := NewTableBuilderCache(a)
d := NewDataset(id, mode, cache)
Expand Down
14 changes: 14 additions & 0 deletions internal/feature/flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions internal/feature/flags.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,9 @@
key: experimentalTestingDiff
default: false
contact: Jonathan Sternberg

- name: Remove Redundant Sort Nodes
description: Planner will remove sort nodes when tables are already sorted
key: removeRedundantSortNodes
default: false
contact: Chris Wolff
1 change: 1 addition & 0 deletions libflux/go/libflux/buildinfo.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,7 @@ var sourceHashes = map[string]string{
"stdlib/universe/skew_test.flux": "7782d41c563c77ba9f4176fa1b5f4f6107e418b7ea301e4896398dbcb514315a",
"stdlib/universe/sort2_test.flux": "1d2043c0d0b38abb8dc61fc1baa6d6052fae63fea55cc6e67fd1600911513bdb",
"stdlib/universe/sort_limit_test.flux": "c595da9613faf8734932d8c2e63291517b7842b5656f795b32d50e987493ec2a",
"stdlib/universe/sort_rules_test.flux": "0770ae42e99b04167ca5bef8340a310b224baf1ba1928997273de9663b64684a",
"stdlib/universe/sort_test.flux": "f69ebb5972762078e759af3c1cd3d852431a569dce74f3c379709c9e174bfa31",
"stdlib/universe/spread_test.flux": "1ddf25e4d86b6365da254229fc8f77bd838c24a79e6a08c9c4c50330ace0a6a3",
"stdlib/universe/state_count_test.flux": "c52be54b9656a2c51e1cfb5115d0a7ac4b06fefa5cd03d90f1a7e193f77b17a7",
Expand Down
76 changes: 50 additions & 26 deletions plan/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,24 @@ func CheckRequiredAttributes(node *PhysicalPlanNode) error {

for i, reqAttrMap := range reqAttrsSlice {
for _, reqAttr := range reqAttrMap {
ppred := node.Predecessors()[i].(*PhysicalPlanNode)
haveAttr, n := getOutputAttributeWithNode(node.Predecessors()[i].(*PhysicalPlanNode), reqAttr.Key())
pred := node.Predecessors()[i]
haveAttr, n := getOutputAttributeWithNode(pred, reqAttr.Key())
if haveAttr == nil {
return errors.Newf(codes.Internal,
"attribute %q, required by %q, is missing from predecessor %q",
msg := fmt.Sprintf("attribute %q, required by %q, is missing from predecessor %q",
reqAttr.Key(), node.ID(), n.ID(),
)
if _, ok := n.(*LogicalNode); ok {
// Logical nodes do not have attributes
msg += " which is a logical node"
}
return errors.New(codes.Internal, msg)
}

if !reqAttr.SatisfiedBy(haveAttr) {
return errors.Newf(codes.Internal,
"node %q requires attribute %v, which is not satisfied by predecessor %q, "+
"which has attribute %v",
node.ID(), reqAttr, ppred.ID(), haveAttr,
node.ID(), reqAttr, pred.ID(), haveAttr,
)
}
}
Expand All @@ -112,29 +116,40 @@ func CheckRequiredAttributes(node *PhysicalPlanNode) error {
// sort |> filter
// The "filter" node will still provide the collation attribute, even though it's
// the "sort" node that actually does the collating.
func GetOutputAttribute(node *PhysicalPlanNode, attrKey string) PhysicalAttr {
func GetOutputAttribute(node Node, attrKey string) PhysicalAttr {
attr, _ := getOutputAttributeWithNode(node, attrKey)
return attr
}

func getOutputAttributeWithNode(node *PhysicalPlanNode, attrKey string) (PhysicalAttr, *PhysicalPlanNode) {
if attr, ok := node.outputAttrs()[attrKey]; ok {
func getOutputAttributeWithNode(node Node, attrKey string) (PhysicalAttr, Node) {
pn, ok := node.(*PhysicalPlanNode)
if !ok {
return nil, node
}

if attr, ok := pn.outputAttrs()[attrKey]; ok {
return attr, nil
}

if node.passesThroughAttr(attrKey) && len(node.Predecessors()) == 1 {
if pn.passesThroughAttr(attrKey) && len(pn.Predecessors()) == 1 {
// TODO(cwolff): consider what it means for nodes with multiple predecessors
// (e.g. join or union) to pass on attributes.
return getOutputAttributeWithNode(node.Predecessors()[0].(*PhysicalPlanNode), attrKey)
return getOutputAttributeWithNode(node.Predecessors()[0], attrKey)
}

return nil, node
}

// CheckSuccessorsMustRequire will return an error if the node has an output attribute
// that must be required by a successor, and nil otherwise.. E.g., the parallel-run
// attribute is like this in that it must be required by a merge node.
// that must be required by *all* successors, but there exists some node that does not
// require it.
//
// E.g., the parallel-run attribute is like this in that it must be required by a merge node.
// This function will walk forward through successors to find the requiring node.
//
// The desired effect here is that if an attribute must be required by successors,
// we walk forward through the graph and ensure that it is required on every branch that
// succeeds the given node, with ohly pass-through nodes in between.
func CheckSuccessorsMustRequire(node *PhysicalPlanNode) error {
for _, attr := range node.outputAttrs() {
if !attr.SuccessorsMustRequire() {
Expand All @@ -150,18 +165,22 @@ func CheckSuccessorsMustRequire(node *PhysicalPlanNode) error {
}

for _, succ := range node.Successors() {
reqd, n := requiredBySuccessor(attr, node, succ.(*PhysicalPlanNode))
reqd, n := requiredBySuccessor(attr, node, succ)
if reqd {
continue
}

if n != nil {
msg := fmt.Sprintf("plan node %q has attribute %q that must be required by successors, "+
"but it is not required or propagated by successor %q",
node.ID(), attr.Key(), n.ID(),
)
if _, ok := n.(*LogicalNode); ok {
msg += " which is a logical node"
}
return &flux.Error{
Code: codes.Internal,
Msg: fmt.Sprintf("plan node %q has attribute %q that must be required by successors, "+
"but it is not required or propagated by successor %q",
node.ID(), attr.Key(), n.ID(),
),
Msg: msg,
}
}

Expand All @@ -179,22 +198,27 @@ func CheckSuccessorsMustRequire(node *PhysicalPlanNode) error {
}

// requiredBySuccessor returns true if the given attribute is required by succ or
// succ passes through the attribute and one of its successors requires the attribute.
// If the attribute is not required, this function returns false and the node that neither passes
// along nor requires the attribute.
func requiredBySuccessor(requiredAttr PhysicalAttr, node, succ *PhysicalPlanNode) (bool, *PhysicalPlanNode) {
i := indexOfNode(node, succ.Predecessors())
if _, ok := succ.requiredAttrs()[i][requiredAttr.Key()]; ok {
// succ passes through the attribute and *all* of its successors require the attribute.
// If the attribute is not required by some succeeding node, this function returns false
// and the node that neither passes along nor requires the attribute.
func requiredBySuccessor(requiredAttr PhysicalAttr, node, succ Node) (bool, Node) {
psucc, ok := succ.(*PhysicalPlanNode)
if !ok {
return false, succ
}

i := indexOfNode(node, psucc.Predecessors())
if _, ok := psucc.requiredAttrs()[i][requiredAttr.Key()]; ok {
return true, succ
}
if succ.passesThroughAttr(requiredAttr.Key()) {
if psucc.passesThroughAttr(requiredAttr.Key()) {
if len(succ.Successors()) == 0 {
return false, nil
}
// If this node does not require the attribute itself but passes it along,
// see if any successors require it.
for _, ssucc := range succ.Successors() {
if reqd, n := requiredBySuccessor(requiredAttr, succ, ssucc.(*PhysicalPlanNode)); !reqd {
for _, ssucc := range psucc.Successors() {
if reqd, n := requiredBySuccessor(requiredAttr, succ, ssucc); !reqd {
return false, n
}
}
Expand Down
61 changes: 57 additions & 4 deletions plan/attributes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,32 @@ func TestCheckRequiredAttributes(t *testing.T) {
},
err: `attribute "mock-attr", required by "require-attr", is missing from predecessor "no-attr"`,
},
{
name: "logical node does not provide attributes",
input: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreateLogicalNode("logical", plantest.MockProcedureSpec{}),
plantest.CreatePhysicalNode("passthru", plantest.MockProcedureSpec{
PassThroughAttributeFn: func(attrKey string) bool { return attrKey == mockAttrKey },
}),
plantest.CreatePhysicalNode("require-attr", plantest.MockProcedureSpec{
RequiredAttributesFn: func() []plan.PhysicalAttributes {
return []plan.PhysicalAttributes{
{
mockAttrKey: &mockAttr{},
},
}
},
}),
},
Edges: [][2]int{
{0, 1},
{1, 2},
},
},
err: `attribute "mock-attr", required by "require-attr", ` +
`is missing from predecessor "logical" which is a logical node`,
},
{
name: "attribute present but not satisfied",
input: &plantest.PlanSpec{
Expand Down Expand Up @@ -160,7 +186,10 @@ func TestCheckRequiredAttributes(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
spec := plantest.CreatePlanSpec(tc.input)
err := spec.BottomUpWalk(func(node plan.Node) error {
return plan.CheckRequiredAttributes(node.(*plan.PhysicalPlanNode))
if pn, ok := node.(*plan.PhysicalPlanNode); ok {
return plan.CheckRequiredAttributes(pn)
}
return nil
})
if tc.err == "" {
require.NoError(t, err)
Expand Down Expand Up @@ -211,7 +240,7 @@ func TestCheckSuccessorsMustRequire(t *testing.T) {
},
},
{
name: "successor does not require",
name: "attr not required by successor",
input: &plantest.PlanSpec{
Nodes: []plan.Node{
plantest.CreatePhysicalNode("successor-does-not-require", plantest.MockProcedureSpec{
Expand All @@ -224,7 +253,7 @@ func TestCheckSuccessorsMustRequire(t *testing.T) {
plantest.CreatePhysicalNode("passthru", plantest.MockProcedureSpec{
PassThroughAttributeFn: func(attrKey string) bool { return attrKey == mockAttrKey },
}),
plantest.CreatePhysicalNode("requires-attr", plantest.MockProcedureSpec{}),
plantest.CreatePhysicalNode("does-not-require", plantest.MockProcedureSpec{}),
},
Edges: [][2]int{
{0, 1},
Expand Down Expand Up @@ -272,6 +301,27 @@ func TestCheckSuccessorsMustRequire(t *testing.T) {
err: `plan node "successor-must-require" has attribute "mock-attr" that must be required by successors, ` +
`but it is not required or propagated by successor "does-not-require-attr"`,
},
{
name: "successor is logical node",
input: &plantest.PlanSpec{
Nodes: []plan.Node{
plantest.CreatePhysicalNode("successor-must-require", plantest.MockProcedureSpec{
OutputAttributesFn: func() plan.PhysicalAttributes {
return plan.PhysicalAttributes{
mockAttrKey: &mockAttr{successorMustRequire: true},
}
},
}),
plan.CreateLogicalNode("logical", plantest.MockProcedureSpec{}),
},
Edges: [][2]int{
{0, 1},
},
},
err: `plan node "successor-must-require" has attribute "mock-attr" ` +
`that must be required by successors, but it is not required or propagated ` +
`by successor "logical" which is a logical node`,
},
{
name: "no requiring successor passthru",
input: &plantest.PlanSpec{
Expand Down Expand Up @@ -299,7 +349,10 @@ func TestCheckSuccessorsMustRequire(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
spec := plantest.CreatePlanSpec(tc.input)
err := spec.BottomUpWalk(func(node plan.Node) error {
return plan.CheckSuccessorsMustRequire(node.(*plan.PhysicalPlanNode))
if pn, ok := node.(*plan.PhysicalPlanNode); ok {
return plan.CheckSuccessorsMustRequire(pn)
}
return nil
})
if tc.err == "" {
require.NoError(t, err)
Expand Down
7 changes: 7 additions & 0 deletions stdlib/internal/debug/pass.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ func (s *PassProcedureSpec) Copy() plan.ProcedureSpec {
return new(PassProcedureSpec)
}

// PassThroughAttribute implements the PassThroughAttributer interface used
// by the planner. This implementation says that any attributes provided by
// input to this transformation are also propagated to its output.
func (s *PassProcedureSpec) PassThroughAttribute(attrKey string) bool {
return true
}

// TriggerSpec implements plan.TriggerAwareProcedureSpec
func (s *PassProcedureSpec) TriggerSpec() plan.TriggerSpec {
return plan.NarrowTransformationTriggerSpec{}
Expand Down
8 changes: 4 additions & 4 deletions stdlib/join/sort_merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ func (SortMergeJoinPredicateRule) Rewrite(ctx context.Context, n plan.Node) (pla
predecessors := n.Predecessors()
n.ClearPredecessors()

makeSortNode := func(parentNode plan.Node, columns []string) *plan.PhysicalPlanNode {
makeSortNode := func(name string, parentNode plan.Node, columns []string) *plan.PhysicalPlanNode {
sortProc := universe.SortProcedureSpec{
Columns: columns,
}
sortNode := plan.CreateUniquePhysicalNode(ctx, "sortMergeJoin", &sortProc)
sortNode := plan.CreateUniquePhysicalNode(ctx, name, &sortProc)

sortNode.AddPredecessors(parentNode)
sortNode.AddSuccessors(n)
Expand All @@ -94,15 +94,15 @@ func (SortMergeJoinPredicateRule) Rewrite(ctx context.Context, n plan.Node) (pla
for _, pair := range spec.On {
columns = append(columns, pair.Left)
}
successors[0] = makeSortNode(predecessors[0], columns)
successors[0] = makeSortNode("sort_join_lhs", predecessors[0], columns)

successors = predecessors[1].Successors()

columns = make([]string, 0, len(spec.On))
for _, pair := range spec.On {
columns = append(columns, pair.Right)
}
successors[0] = makeSortNode(predecessors[1], columns)
successors[0] = makeSortNode("sort_join_rhs", predecessors[1], columns)

// Replace the spec so we don't end up trying to apply this rewrite forever
x := SortMergeJoinProcedureSpec(*spec)
Expand Down
Loading

0 comments on commit 2914ff7

Please sign in to comment.