Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: reset PhysicalWindow's schema for mpp plan. #34764

Merged
merged 6 commits into from
May 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"math"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -1891,12 +1892,28 @@ func (p *PhysicalHashAgg) attach2Task(tasks ...task) task {
return t
}

func (p *PhysicalWindow) attach2TaskForMPP(mpp *mppTask) task {
// FIXME: currently, tiflash's join has different schema with TiDB,
// so we have to rebuild the schema of join and operators which may inherit schema from join.
// for window, we take the sub-plan's schema, and the schema generated by windowDescs.
columns := p.Schema().Clone().Columns[len(p.Schema().Columns)-len(p.WindowFuncDescs):]
p.schema = expression.MergeSchema(mpp.plan().Schema(), expression.NewSchema(columns...))

failpoint.Inject("CheckMPPWindowSchemaLength", func() {
if len(p.Schema().Columns) != len(mpp.plan().Schema().Columns)+len(p.WindowFuncDescs) {
panic("mpp physical window has incorrect schema length")
}
})

// TODO: find a better way to solve the cost problem.
mpp.cst = mpp.cost() * 0.05
p.cost = mpp.cost()
return attachPlan2Task(p, mpp)
}

func (p *PhysicalWindow) attach2Task(tasks ...task) task {
if mpp, ok := tasks[0].copy().(*mppTask); ok && p.storeTp == kv.TiFlash {
// TODO: find a better way to solve the cost problem.
mpp.cst = mpp.cost() * 0.05
p.cost = mpp.cost()
return attachPlan2Task(p, mpp)
return p.attach2TaskForMPP(mpp)
}
t := tasks[0].convertToRootTask(p.ctx)
p.cost = t.cost()
Expand Down
21 changes: 21 additions & 0 deletions planner/core/window_push_down_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"strings"
"testing"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/parser/model"
plannercore "github.com/pingcap/tidb/planner/core"
Expand Down Expand Up @@ -122,3 +123,23 @@ func TestWindowPlanWithOtherOperators(t *testing.T) {
suiteData.GetTestCases(t, &input, &output)
testWithData(t, tk, input, output)
}

func TestIssue34765(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
dom := domain.GetDomain(tk.Session())

tk.MustExec("use test")
tk.MustExec("create table t1(c1 varchar(32), c2 datetime, c3 bigint, c4 varchar(64));")
tk.MustExec("create table t2(b2 varchar(64));")
tk.MustExec("set tidb_enforce_mpp=1;")
SetTiFlashReplica(t, dom, "test", "t1")
SetTiFlashReplica(t, dom, "test", "t2")

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/planner/core/CheckMPPWindowSchemaLength", "return"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/planner/core/CheckMPPWindowSchemaLength"))
}()
tk.MustExec("explain select count(*) from (select row_number() over (partition by c1 order by c2) num from (select * from t1 left join t2 on t1.c4 = t2.b2) tem2 ) tx where num = 1;")
}