Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: fix bug of plan digest is same when cop task store is different (#20054) #20076

Merged
merged 12 commits into from
Sep 21, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 22 additions & 32 deletions planner/core/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,12 @@ func EncodePlan(p Plan) string {
func (pn *planEncoder) encodePlanTree(p Plan) string {
pn.encodedPlans = make(map[int]bool)
pn.buf.Reset()
pn.encodePlan(p, true, 0)
pn.encodePlan(p, true, kv.TiKV, 0)
return plancodec.Compress(pn.buf.Bytes())
}

func (pn *planEncoder) encodePlan(p Plan, isRoot bool, depth int) {
var storeType kv.StoreType = kv.UnSpecified
if !isRoot {
switch copPlan := p.(type) {
case *PhysicalTableReader:
storeType = copPlan.StoreType
case *PhysicalTableScan:
storeType = copPlan.StoreType
default:
storeType = kv.TiKV
}
}
taskTypeInfo := plancodec.EncodeTaskType(isRoot, storeType)
func (pn *planEncoder) encodePlan(p Plan, isRoot bool, store kv.StoreType, depth int) {
taskTypeInfo := plancodec.EncodeTaskType(isRoot, store)
actRows, analyzeInfo, memoryInfo, diskInfo := getRuntimeInfo(p.SCtx(), p)
rowCount := 0.0
if statsInfo := p.statsInfo(); statsInfo != nil {
Expand All @@ -86,29 +75,29 @@ func (pn *planEncoder) encodePlan(p Plan, isRoot bool, depth int) {
return
}
if !pn.encodedPlans[selectPlan.ID()] {
pn.encodePlan(selectPlan, isRoot, depth)
pn.encodePlan(selectPlan, isRoot, store, depth)
return
}
for _, child := range selectPlan.Children() {
if pn.encodedPlans[child.ID()] {
continue
}
pn.encodePlan(child.(PhysicalPlan), isRoot, depth)
pn.encodePlan(child.(PhysicalPlan), isRoot, store, depth)
}
switch copPlan := selectPlan.(type) {
case *PhysicalTableReader:
pn.encodePlan(copPlan.tablePlan, false, depth)
pn.encodePlan(copPlan.tablePlan, false, copPlan.StoreType, depth)
case *PhysicalIndexReader:
pn.encodePlan(copPlan.indexPlan, false, depth)
pn.encodePlan(copPlan.indexPlan, false, store, depth)
case *PhysicalIndexLookUpReader:
pn.encodePlan(copPlan.indexPlan, false, depth)
pn.encodePlan(copPlan.tablePlan, false, depth)
pn.encodePlan(copPlan.indexPlan, false, store, depth)
pn.encodePlan(copPlan.tablePlan, false, store, depth)
case *PhysicalIndexMergeReader:
for _, p := range copPlan.partialPlans {
pn.encodePlan(p, false, depth)
pn.encodePlan(p, false, store, depth)
}
if copPlan.tablePlan != nil {
pn.encodePlan(copPlan.tablePlan, false, depth)
pn.encodePlan(copPlan.tablePlan, false, store, depth)
}
}
}
Expand Down Expand Up @@ -147,34 +136,35 @@ func NormalizePlan(p Plan) (normalized, digest string) {
func (d *planDigester) normalizePlanTree(p PhysicalPlan) {
d.encodedPlans = make(map[int]bool)
d.buf.Reset()
d.normalizePlan(p, true, 0)
d.normalizePlan(p, true, kv.TiKV, 0)
}

func (d *planDigester) normalizePlan(p PhysicalPlan, isRoot bool, depth int) {
plancodec.NormalizePlanNode(depth, p.TP(), isRoot, p.ExplainNormalizedInfo(), &d.buf)
func (d *planDigester) normalizePlan(p PhysicalPlan, isRoot bool, store kv.StoreType, depth int) {
taskTypeInfo := plancodec.EncodeTaskTypeForNormalize(isRoot, store)
plancodec.NormalizePlanNode(depth, p.TP(), taskTypeInfo, p.ExplainNormalizedInfo(), &d.buf)
d.encodedPlans[p.ID()] = true

depth++
for _, child := range p.Children() {
if d.encodedPlans[child.ID()] {
continue
}
d.normalizePlan(child.(PhysicalPlan), isRoot, depth)
d.normalizePlan(child.(PhysicalPlan), isRoot, store, depth)
}
switch x := p.(type) {
case *PhysicalTableReader:
d.normalizePlan(x.tablePlan, false, depth)
d.normalizePlan(x.tablePlan, false, x.StoreType, depth)
case *PhysicalIndexReader:
d.normalizePlan(x.indexPlan, false, depth)
d.normalizePlan(x.indexPlan, false, store, depth)
case *PhysicalIndexLookUpReader:
d.normalizePlan(x.indexPlan, false, depth)
d.normalizePlan(x.tablePlan, false, depth)
d.normalizePlan(x.indexPlan, false, store, depth)
d.normalizePlan(x.tablePlan, false, store, depth)
case *PhysicalIndexMergeReader:
for _, p := range x.partialPlans {
d.normalizePlan(p, false, depth)
d.normalizePlan(p, false, store, depth)
}
if x.tablePlan != nil {
d.normalizePlan(x.tablePlan, false, depth)
d.normalizePlan(x.tablePlan, false, store, depth)
}
}
}
Expand Down
41 changes: 41 additions & 0 deletions planner/core/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/planner/core"
Expand Down Expand Up @@ -88,6 +89,46 @@ func (s *testPlanNormalize) TestNormalizedPlan(c *C) {
}
}

func (s *testPlanNormalize) TestNormalizedPlanForDiffStore(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1 (a int, b int, c int, primary key(a))")
tk.MustExec("insert into t1 values(1,1,1), (2,2,2), (3,3,3)")

tbl, err := s.dom.InfoSchema().TableByName(model.CIStr{O: "test", L: "test"}, model.CIStr{O: "t1", L: "t1"})
c.Assert(err, IsNil)
// Set the hacked TiFlash replica for explain tests.
tbl.Meta().TiFlashReplica = &model.TiFlashReplicaInfo{Count: 1, Available: true}

var input []string
var output []struct {
Digest string
Plan []string
}
s.testData.GetTestCases(c, &input, &output)
lastDigest := ""
for i, tt := range input {
tk.Se.GetSessionVars().PlanID = 0
tk.MustExec(tt)
info := tk.Se.ShowProcess()
c.Assert(info, NotNil)
ep, ok := info.Plan.(*core.Explain)
c.Assert(ok, IsTrue)
normalized, digest := core.NormalizePlan(ep.TargetPlan)
normalizedPlan, err := plancodec.DecodeNormalizedPlan(normalized)
normalizedPlanRows := getPlanRows(normalizedPlan)
c.Assert(err, IsNil)
s.testData.OnRecord(func() {
output[i].Digest = digest
output[i].Plan = normalizedPlanRows
})
compareStringSlice(c, normalizedPlanRows, output[i].Plan)
c.Assert(digest != lastDigest, IsTrue)
lastDigest = digest
}
}

func (s *testPlanNormalize) TestEncodeDecodePlan(c *C) {
if israce.RaceEnabled {
c.Skip("skip race test")
Expand Down
9 changes: 9 additions & 0 deletions planner/core/testdata/plan_normalized_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,14 @@
"create table t1_tmp (a int)",
"alter table t1_tmp add column c int"
]
},
{
"name": "TestNormalizedPlanForDiffStore",
"cases": [
"explain select /*+ read_from_storage(tiflash[t1]) */ * from t1",
"explain select /*+ read_from_storage(tikv[t1]) */ * from t1",
"explain select /*+ read_from_storage(tiflash[t1]) */ a+b from t1 where a+b < 1",
"explain select /*+ read_from_storage(tikv[t1]) */ a+b from t1 where a+b < 1"
]
}
]
37 changes: 37 additions & 0 deletions planner/core/testdata/plan_normalized_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -199,5 +199,42 @@
]
}
]
},
{
"Name": "TestNormalizedPlanForDiffStore",
"Cases": [
{
"Digest": "63eab1c93f586cf9fbe71cbfa4ad212aadb019e3e477f2f6257d00d35e045980",
"Plan": [
" TableReader root ",
" └─TableScan cop[tiflash] table:t1, range:[?,?], keep order:false"
]
},
{
"Digest": "6dc9f1500bbea92b2446d58c1510bca2e78f0e9a6c721c76495b0cf6bfc95faa",
"Plan": [
" TableReader root ",
" └─TableScan cop table:t1, range:[?,?], keep order:false"
]
},
{
"Digest": "03f12d0f634596922b6ba2edab8d6565a36bc2264cea9613adeb506e32d6b901",
"Plan": [
" Projection root plus(test.t1.a, test.t1.b)",
" └─TableReader root ",
" └─Selection cop[tiflash] lt(plus(test.t1.a, test.t1.b), ?)",
" └─TableScan cop[tiflash] table:t1, range:[?,?], keep order:false"
]
},
{
"Digest": "5f2f4343d1cf9bbd0893f78c01657307fdebadacbd0b9e60e4b5cca27656b739",
"Plan": [
" Projection root plus(test.t1.a, test.t1.b)",
" └─TableReader root ",
" └─Selection cop lt(plus(test.t1.a, test.t1.b), ?)",
" └─TableScan cop table:t1, range:[?,?], keep order:false"
]
}
]
}
]
18 changes: 12 additions & 6 deletions util/plancodec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,17 +338,13 @@ func EncodePlanNode(depth, pid int, planType string, rowCount float64,
}

// NormalizePlanNode is used to normalize the plan to a string.
func NormalizePlanNode(depth int, planType string, isRoot bool, explainInfo string, buf *bytes.Buffer) {
func NormalizePlanNode(depth int, planType string, taskTypeInfo string, explainInfo string, buf *bytes.Buffer) {
buf.WriteString(strconv.Itoa(depth))
buf.WriteByte(separator)
planID := TypeStringToPhysicalID(planType)
buf.WriteString(strconv.Itoa(planID))
buf.WriteByte(separator)
if isRoot {
buf.WriteString(rootTaskType)
} else {
buf.WriteString(copTaskType)
}
buf.WriteString(taskTypeInfo)
buf.WriteByte(separator)
buf.WriteString(explainInfo)
buf.WriteByte(lineBreaker)
Expand All @@ -367,6 +363,16 @@ func EncodeTaskType(isRoot bool, storeType kv.StoreType) string {
return copTaskType + idSeparator + strconv.Itoa((int)(storeType))
}

// EncodeTaskTypeForNormalize is used to encode task type to a string. Only use for normalize plan.
func EncodeTaskTypeForNormalize(isRoot bool, storeType kv.StoreType) string {
if isRoot {
return rootTaskType
} else if storeType == kv.TiKV {
return copTaskType
}
return copTaskType + idSeparator + strconv.Itoa((int)(storeType))
}

func decodeTaskType(str string) (string, error) {
segs := strings.Split(str, idSeparator)
if segs[0] == rootTaskType {
Expand Down