From 9b1ef7815a808373cabea2e1640cc9cc5a87fd18 Mon Sep 17 00:00:00 2001 From: Zhi Qi <30543181+LittleFall@users.noreply.github.com> Date: Wed, 18 May 2022 15:00:34 +0800 Subject: [PATCH] planner: reset PhysicalWindow's schema for mpp plan. (#34764) * fix * add failpoint * add test * sort import Co-authored-by: Ti Chi Robot --- planner/core/task.go | 25 +++++++++++++++++++++---- planner/core/window_push_down_test.go | 21 +++++++++++++++++++++ 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/planner/core/task.go b/planner/core/task.go index 6684d26b80534..a60cbf2b7ebf0 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -18,6 +18,7 @@ import ( "math" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" "github.com/pingcap/tidb/kv" @@ -1891,12 +1892,28 @@ func (p *PhysicalHashAgg) attach2Task(tasks ...task) task { return t } +func (p *PhysicalWindow) attach2TaskForMPP(mpp *mppTask) task { + // FIXME: currently, tiflash's join has different schema with TiDB, + // so we have to rebuild the schema of join and operators which may inherit schema from join. + // for window, we take the sub-plan's schema, and the schema generated by windowDescs. + columns := p.Schema().Clone().Columns[len(p.Schema().Columns)-len(p.WindowFuncDescs):] + p.schema = expression.MergeSchema(mpp.plan().Schema(), expression.NewSchema(columns...)) + + failpoint.Inject("CheckMPPWindowSchemaLength", func() { + if len(p.Schema().Columns) != len(mpp.plan().Schema().Columns)+len(p.WindowFuncDescs) { + panic("mpp physical window has incorrect schema length") + } + }) + + // TODO: find a better way to solve the cost problem. + mpp.cst = mpp.cost() * 0.05 + p.cost = mpp.cost() + return attachPlan2Task(p, mpp) +} + func (p *PhysicalWindow) attach2Task(tasks ...task) task { if mpp, ok := tasks[0].copy().(*mppTask); ok && p.storeTp == kv.TiFlash { - // TODO: find a better way to solve the cost problem. - mpp.cst = mpp.cost() * 0.05 - p.cost = mpp.cost() - return attachPlan2Task(p, mpp) + return p.attach2TaskForMPP(mpp) } t := tasks[0].convertToRootTask(p.ctx) p.cost = t.cost() diff --git a/planner/core/window_push_down_test.go b/planner/core/window_push_down_test.go index 382f009a3f4ff..a8ec9c0955d56 100644 --- a/planner/core/window_push_down_test.go +++ b/planner/core/window_push_down_test.go @@ -18,6 +18,7 @@ import ( "strings" "testing" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/parser/model" plannercore "github.com/pingcap/tidb/planner/core" @@ -122,3 +123,23 @@ func TestWindowPlanWithOtherOperators(t *testing.T) { suiteData.GetTestCases(t, &input, &output) testWithData(t, tk, input, output) } + +func TestIssue34765(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + dom := domain.GetDomain(tk.Session()) + + tk.MustExec("use test") + tk.MustExec("create table t1(c1 varchar(32), c2 datetime, c3 bigint, c4 varchar(64));") + tk.MustExec("create table t2(b2 varchar(64));") + tk.MustExec("set tidb_enforce_mpp=1;") + SetTiFlashReplica(t, dom, "test", "t1") + SetTiFlashReplica(t, dom, "test", "t2") + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/planner/core/CheckMPPWindowSchemaLength", "return")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/planner/core/CheckMPPWindowSchemaLength")) + }() + tk.MustExec("explain select count(*) from (select row_number() over (partition by c1 order by c2) num from (select * from t1 left join t2 on t1.c4 = t2.b2) tem2 ) tx where num = 1;") +}