Skip to content

Commit

Permalink
planner: add trace for join reorder (#30394)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yisaer authored Dec 13, 2021
1 parent b8dcc09 commit a532973
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 19 deletions.
26 changes: 23 additions & 3 deletions planner/core/logical_plan_trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,28 @@ func (s *testPlanSuite) TestSingleRuleTraceStep(c *C) {
assertRuleName string
assertRuleSteps []assertTraceStep
}{
{
sql: "select * from (t t1, t t2, t t3,t t4) union all select * from (t t5, t t6, t t7,t t8)",
flags: []uint64{flagBuildKeyInfo, flagPrunColumns, flagDecorrelate, flagPredicatePushDown, flagEliminateOuterJoin, flagJoinReOrder},
assertRuleName: "join_reorder",
assertRuleSteps: []assertTraceStep{
{
assertAction: "join order becomes [((t1*t2)*(t3*t4)),((t5*t6)*(t7*t8))] from original [(((t1*t2)*t3)*t4),(((t5*t6)*t7)*t8)]",
assertReason: "join cost during reorder: [[t1, cost:10000],[t2, cost:10000],[t3, cost:10000],[t4, cost:10000],[t5, cost:10000],[t6, cost:10000],[t7, cost:10000],[t8, cost:10000]]",
},
},
},
{
sql: "select * from t t1, t t2, t t3 where t1.a=t2.a and t3.a=t2.a and t1.a=t3.a",
flags: []uint64{flagBuildKeyInfo, flagPrunColumns, flagDecorrelate, flagPredicatePushDown, flagEliminateOuterJoin, flagJoinReOrder},
assertRuleName: "join_reorder",
assertRuleSteps: []assertTraceStep{
{
assertAction: "join order becomes ((t1*t2)*t3) from original ((t1*t2)*t3)",
assertReason: "join cost during reorder: [[((t1*t2)*t3), cost:58125],[(t1*t2), cost:32500],[(t1*t3), cost:32500],[t1, cost:10000],[t2, cost:10000],[t3, cost:10000]]",
},
},
},
{
sql: "select min(distinct a) from t group by a",
flags: []uint64{flagBuildKeyInfo, flagEliminateAgg},
Expand Down Expand Up @@ -215,10 +237,8 @@ func (s *testPlanSuite) TestSingleRuleTraceStep(c *C) {
for _, f := range tc.flags {
flag = flag | f
}
p, err = logicalOptimize(ctx, flag, p.(LogicalPlan))
_, err = logicalOptimize(ctx, flag, p.(LogicalPlan))
c.Assert(err, IsNil)
_, ok := p.(*LogicalProjection)
c.Assert(ok, IsTrue)
otrace := sctx.GetSessionVars().StmtCtx.LogicalOptimizeTrace
c.Assert(otrace, NotNil)
assert := false
Expand Down
154 changes: 148 additions & 6 deletions planner/core/rule_join_reorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@
package core

import (
"bytes"
"context"
"fmt"
"sort"

"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/plancodec"
"github.com/pingcap/tidb/util/tracing"
)

// extractJoinGroup extracts all the join nodes connected with continuous
Expand Down Expand Up @@ -56,16 +61,21 @@ type jrNode struct {
}

func (s *joinReOrderSolver) optimize(ctx context.Context, p LogicalPlan, opt *logicalOptimizeOp) (LogicalPlan, error) {
return s.optimizeRecursive(p.SCtx(), p)
tracer := &joinReorderTrace{cost: map[string]float64{}, opt: opt}
tracer.traceJoinReorder(p)
p, err := s.optimizeRecursive(p.SCtx(), p, tracer)
tracer.traceJoinReorder(p)
appendJoinReorderTraceStep(tracer, p, opt)
return p, err
}

// optimizeRecursive recursively collects join groups and applies join reorder algorithm for each group.
func (s *joinReOrderSolver) optimizeRecursive(ctx sessionctx.Context, p LogicalPlan) (LogicalPlan, error) {
func (s *joinReOrderSolver) optimizeRecursive(ctx sessionctx.Context, p LogicalPlan, tracer *joinReorderTrace) (LogicalPlan, error) {
var err error
curJoinGroup, eqEdges, otherConds := extractJoinGroup(p)
if len(curJoinGroup) > 1 {
for i := range curJoinGroup {
curJoinGroup[i], err = s.optimizeRecursive(ctx, curJoinGroup[i])
curJoinGroup[i], err = s.optimizeRecursive(ctx, curJoinGroup[i], tracer)
if err != nil {
return nil, err
}
Expand All @@ -80,13 +90,13 @@ func (s *joinReOrderSolver) optimizeRecursive(ctx sessionctx.Context, p LogicalP
baseSingleGroupJoinOrderSolver: baseGroupSolver,
eqEdges: eqEdges,
}
p, err = groupSolver.solve(curJoinGroup)
p, err = groupSolver.solve(curJoinGroup, tracer)
} else {
dpSolver := &joinReorderDPSolver{
baseSingleGroupJoinOrderSolver: baseGroupSolver,
}
dpSolver.newJoin = dpSolver.newJoinWithEdges
p, err = dpSolver.solve(curJoinGroup, expression.ScalarFuncs2Exprs(eqEdges))
p, err = dpSolver.solve(curJoinGroup, expression.ScalarFuncs2Exprs(eqEdges), tracer)
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -114,7 +124,7 @@ func (s *joinReOrderSolver) optimizeRecursive(ctx sessionctx.Context, p LogicalP
}
newChildren := make([]LogicalPlan, 0, len(p.Children()))
for _, child := range p.Children() {
newChild, err := s.optimizeRecursive(ctx, child)
newChild, err := s.optimizeRecursive(ctx, child, tracer)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -194,3 +204,135 @@ func (s *baseSingleGroupJoinOrderSolver) calcJoinCumCost(join LogicalPlan, lNode
func (*joinReOrderSolver) name() string {
return "join_reorder"
}

func appendJoinReorderTraceStep(tracer *joinReorderTrace, plan LogicalPlan, opt *logicalOptimizeOp) {
if len(tracer.initial) < 1 || len(tracer.final) < 1 {
return
}
action := fmt.Sprintf("join order becomes %v from original %v", tracer.final, tracer.initial)
reason := func() string {
buffer := bytes.NewBufferString("join cost during reorder: [")
var joins []string
for join := range tracer.cost {
joins = append(joins, join)
}
sort.Strings(joins)
for i, join := range joins {
if i > 0 {
buffer.WriteString(",")
}
buffer.WriteString(fmt.Sprintf("[%s, cost:%v]", join, tracer.cost[join]))
}
buffer.WriteString("]")
return buffer.String()
}()
opt.appendStepToCurrent(plan.ID(), plan.TP(), reason, action)
}

func allJoinOrderToString(tt []*tracing.LogicalPlanTrace) string {
if len(tt) == 1 {
return joinOrderToString(tt[0])
}
buffer := bytes.NewBufferString("[")
for i, t := range tt {
if i > 0 {
buffer.WriteString(",")
}
buffer.WriteString(joinOrderToString(t))
}
buffer.WriteString("]")
return buffer.String()
}

// joinOrderToString let Join(DataSource, DataSource) become '(t1*t2)'
func joinOrderToString(t *tracing.LogicalPlanTrace) string {
if t.TP == plancodec.TypeJoin {
buffer := bytes.NewBufferString("(")
for i, child := range t.Children {
if i > 0 {
buffer.WriteString("*")
}
buffer.WriteString(joinOrderToString(child))
}
buffer.WriteString(")")
return buffer.String()
} else if t.TP == plancodec.TypeDataSource {
return t.ExplainInfo[6:]
}
return ""
}

// extractJoinAndDataSource will only keep join and dataSource operator and remove other operators.
// For example: Proj->Join->(Proj->DataSource, DataSource) will become Join->(DataSource, DataSource)
func extractJoinAndDataSource(t *tracing.LogicalPlanTrace) []*tracing.LogicalPlanTrace {
roots := findRoots(t)
if len(roots) < 1 {
return nil
}
var rr []*tracing.LogicalPlanTrace
for _, root := range roots {
simplify(root)
rr = append(rr, root)
}
return rr
}

// simplify only keeps Join and DataSource operators, and discard other operators.
func simplify(node *tracing.LogicalPlanTrace) {
if len(node.Children) < 1 {
return
}
for valid := false; !valid; {
valid = true
newChildren := make([]*tracing.LogicalPlanTrace, 0)
for _, child := range node.Children {
if child.TP != plancodec.TypeDataSource && child.TP != plancodec.TypeJoin {
newChildren = append(newChildren, child.Children...)
valid = false
} else {
newChildren = append(newChildren, child)
}
}
node.Children = newChildren
}
for _, child := range node.Children {
simplify(child)
}
}

func findRoots(t *tracing.LogicalPlanTrace) []*tracing.LogicalPlanTrace {
if t.TP == plancodec.TypeJoin || t.TP == plancodec.TypeDataSource {
return []*tracing.LogicalPlanTrace{t}
}
var r []*tracing.LogicalPlanTrace
for _, child := range t.Children {
r = append(r, findRoots(child)...)
}
return r
}

type joinReorderTrace struct {
opt *logicalOptimizeOp
initial string
final string
cost map[string]float64
}

func (t *joinReorderTrace) traceJoinReorder(p LogicalPlan) {
if t == nil || t.opt == nil || t.opt.tracer == nil {
return
}
if len(t.initial) > 0 {
t.final = allJoinOrderToString(extractJoinAndDataSource(p.buildLogicalPlanTrace(p)))
return
}
t.initial = allJoinOrderToString(extractJoinAndDataSource(p.buildLogicalPlanTrace(p)))
}

func (t *joinReorderTrace) appendLogicalJoinCost(join LogicalPlan, cost float64) {
if t == nil || t.opt == nil || t.opt.tracer == nil {
return
}
joinMapKey := allJoinOrderToString(extractJoinAndDataSource(join.buildLogicalPlanTrace(join)))
t.cost[joinMapKey] = cost
}
11 changes: 7 additions & 4 deletions planner/core/rule_join_reorder_dp.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,18 @@ type joinGroupNonEqEdge struct {
expr expression.Expression
}

func (s *joinReorderDPSolver) solve(joinGroup []LogicalPlan, eqConds []expression.Expression) (LogicalPlan, error) {
func (s *joinReorderDPSolver) solve(joinGroup []LogicalPlan, eqConds []expression.Expression, tracer *joinReorderTrace) (LogicalPlan, error) {
for _, node := range joinGroup {
_, err := node.recursiveDeriveStats(nil)
if err != nil {
return nil, err
}
cost := s.baseNodeCumCost(node)
s.curJoinGroup = append(s.curJoinGroup, &jrNode{
p: node,
cumCost: s.baseNodeCumCost(node),
cumCost: cost,
})
tracer.appendLogicalJoinCost(node, cost)
}
adjacents := make([][]int, len(s.curJoinGroup))
totalEqEdges := make([]joinGroupEqEdge, 0, len(eqConds))
Expand Down Expand Up @@ -120,7 +122,7 @@ func (s *joinReorderDPSolver) solve(joinGroup []LogicalPlan, eqConds []expressio
totalNonEqEdges = append(totalNonEqEdges[:i], totalNonEqEdges[i+1:]...)
}
// Do DP on each sub graph.
join, err := s.dpGraph(visitID2NodeID, nodeID2VisitID, joinGroup, totalEqEdges, subNonEqEdges)
join, err := s.dpGraph(visitID2NodeID, nodeID2VisitID, joinGroup, totalEqEdges, subNonEqEdges, tracer)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -159,7 +161,7 @@ func (s *joinReorderDPSolver) bfsGraph(startNode int, visited []bool, adjacents
// It implements the traditional join reorder algorithm: DP by subset using the following formula:
// bestPlan[S:set of node] = the best one among Join(bestPlan[S1:subset of S], bestPlan[S2: S/S1])
func (s *joinReorderDPSolver) dpGraph(visitID2NodeID, nodeID2VisitID []int, joinGroup []LogicalPlan,
totalEqEdges []joinGroupEqEdge, totalNonEqEdges []joinGroupNonEqEdge) (LogicalPlan, error) {
totalEqEdges []joinGroupEqEdge, totalNonEqEdges []joinGroupNonEqEdge, tracer *joinReorderTrace) (LogicalPlan, error) {
nodeCnt := uint(len(visitID2NodeID))
bestPlan := make([]*jrNode, 1<<nodeCnt)
// bestPlan[s] is nil can be treated as bestCost[s] = +inf.
Expand Down Expand Up @@ -192,6 +194,7 @@ func (s *joinReorderDPSolver) dpGraph(visitID2NodeID, nodeID2VisitID []int, join
return nil, err
}
curCost := s.calcJoinCumCost(join, bestPlan[sub], bestPlan[remain])
tracer.appendLogicalJoinCost(join, curCost)
if bestPlan[nodeBitmap] == nil {
bestPlan[nodeBitmap] = &jrNode{
p: join,
Expand Down
4 changes: 2 additions & 2 deletions planner/core/rule_join_reorder_dp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (s *testJoinReorderDPSuite) TestDPReorderTPCHQ5(c *C) {
},
newJoin: s.newMockJoin,
}
result, err := solver.solve(joinGroups, eqConds)
result, err := solver.solve(joinGroups, eqConds, nil)
c.Assert(err, IsNil)
c.Assert(s.planToString(result), Equals, "MockJoin{supplier, MockJoin{lineitem, MockJoin{orders, MockJoin{customer, MockJoin{nation, region}}}}}")
}
Expand All @@ -212,7 +212,7 @@ func (s *testJoinReorderDPSuite) TestDPReorderAllCartesian(c *C) {
},
newJoin: s.newMockJoin,
}
result, err := solver.solve(joinGroup, nil)
result, err := solver.solve(joinGroup, nil, nil)
c.Assert(err, IsNil)
c.Assert(s.planToString(result), Equals, "MockJoin{MockJoin{a, b}, MockJoin{c, d}}")
}
11 changes: 7 additions & 4 deletions planner/core/rule_join_reorder_greedy.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,26 @@ type joinReorderGreedySolver struct {
//
// For the nodes and join trees which don't have a join equal condition to
// connect them, we make a bushy join tree to do the cartesian joins finally.
func (s *joinReorderGreedySolver) solve(joinNodePlans []LogicalPlan) (LogicalPlan, error) {
func (s *joinReorderGreedySolver) solve(joinNodePlans []LogicalPlan, tracer *joinReorderTrace) (LogicalPlan, error) {
for _, node := range joinNodePlans {
_, err := node.recursiveDeriveStats(nil)
if err != nil {
return nil, err
}
cost := s.baseNodeCumCost(node)
s.curJoinGroup = append(s.curJoinGroup, &jrNode{
p: node,
cumCost: s.baseNodeCumCost(node),
cumCost: cost,
})
tracer.appendLogicalJoinCost(node, cost)
}
sort.SliceStable(s.curJoinGroup, func(i, j int) bool {
return s.curJoinGroup[i].cumCost < s.curJoinGroup[j].cumCost
})

var cartesianGroup []LogicalPlan
for len(s.curJoinGroup) > 0 {
newNode, err := s.constructConnectedJoinTree()
newNode, err := s.constructConnectedJoinTree(tracer)
if err != nil {
return nil, err
}
Expand All @@ -68,7 +70,7 @@ func (s *joinReorderGreedySolver) solve(joinNodePlans []LogicalPlan) (LogicalPla
return s.makeBushyJoin(cartesianGroup), nil
}

func (s *joinReorderGreedySolver) constructConnectedJoinTree() (*jrNode, error) {
func (s *joinReorderGreedySolver) constructConnectedJoinTree(tracer *joinReorderTrace) (*jrNode, error) {
curJoinTree := s.curJoinGroup[0]
s.curJoinGroup = s.curJoinGroup[1:]
for {
Expand All @@ -86,6 +88,7 @@ func (s *joinReorderGreedySolver) constructConnectedJoinTree() (*jrNode, error)
return nil, err
}
curCost := s.calcJoinCumCost(newJoin, curJoinTree, node)
tracer.appendLogicalJoinCost(newJoin, curCost)
if bestCost > curCost {
bestCost = curCost
bestJoin = newJoin
Expand Down

0 comments on commit a532973

Please sign in to comment.