Skip to content

Commit

Permalink
planner: reset PhysicalWindow's schema for mpp plan. (#34764)
Browse files Browse the repository at this point in the history
* fix

* add failpoint

* add test

* sort import

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
LittleFall and ti-chi-bot authored May 18, 2022
1 parent d834649 commit 9b1ef78
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 4 deletions.
25 changes: 21 additions & 4 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"math"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -1891,12 +1892,28 @@ func (p *PhysicalHashAgg) attach2Task(tasks ...task) task {
return t
}

func (p *PhysicalWindow) attach2TaskForMPP(mpp *mppTask) task {
// FIXME: currently, tiflash's join has different schema with TiDB,
// so we have to rebuild the schema of join and operators which may inherit schema from join.
// for window, we take the sub-plan's schema, and the schema generated by windowDescs.
columns := p.Schema().Clone().Columns[len(p.Schema().Columns)-len(p.WindowFuncDescs):]
p.schema = expression.MergeSchema(mpp.plan().Schema(), expression.NewSchema(columns...))

failpoint.Inject("CheckMPPWindowSchemaLength", func() {
if len(p.Schema().Columns) != len(mpp.plan().Schema().Columns)+len(p.WindowFuncDescs) {
panic("mpp physical window has incorrect schema length")
}
})

// TODO: find a better way to solve the cost problem.
mpp.cst = mpp.cost() * 0.05
p.cost = mpp.cost()
return attachPlan2Task(p, mpp)
}

func (p *PhysicalWindow) attach2Task(tasks ...task) task {
if mpp, ok := tasks[0].copy().(*mppTask); ok && p.storeTp == kv.TiFlash {
// TODO: find a better way to solve the cost problem.
mpp.cst = mpp.cost() * 0.05
p.cost = mpp.cost()
return attachPlan2Task(p, mpp)
return p.attach2TaskForMPP(mpp)
}
t := tasks[0].convertToRootTask(p.ctx)
p.cost = t.cost()
Expand Down
21 changes: 21 additions & 0 deletions planner/core/window_push_down_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"strings"
"testing"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/parser/model"
plannercore "github.com/pingcap/tidb/planner/core"
Expand Down Expand Up @@ -122,3 +123,23 @@ func TestWindowPlanWithOtherOperators(t *testing.T) {
suiteData.GetTestCases(t, &input, &output)
testWithData(t, tk, input, output)
}

func TestIssue34765(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
dom := domain.GetDomain(tk.Session())

tk.MustExec("use test")
tk.MustExec("create table t1(c1 varchar(32), c2 datetime, c3 bigint, c4 varchar(64));")
tk.MustExec("create table t2(b2 varchar(64));")
tk.MustExec("set tidb_enforce_mpp=1;")
SetTiFlashReplica(t, dom, "test", "t1")
SetTiFlashReplica(t, dom, "test", "t2")

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/planner/core/CheckMPPWindowSchemaLength", "return"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/planner/core/CheckMPPWindowSchemaLength"))
}()
tk.MustExec("explain select count(*) from (select row_number() over (partition by c1 order by c2) num from (select * from t1 left join t2 on t1.c4 = t2.b2) tem2 ) tx where num = 1;")
}

0 comments on commit 9b1ef78

Please sign in to comment.