Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: outer merge join cannot keep the prop of its inner child (#33359) #33375

Merged
merged 5 commits into from
Apr 27, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,14 +287,16 @@ func (p *LogicalJoin) getEnforcedMergeJoin(prop *property.PhysicalProperty, sche
return nil
}
for _, item := range prop.SortItems {
isExist := false
isExist, hasLeftColInProp, hasRightColInProp := false, false, false
for joinKeyPos := 0; joinKeyPos < len(leftJoinKeys); joinKeyPos++ {
var key *expression.Column
if item.Col.Equal(p.ctx, leftJoinKeys[joinKeyPos]) {
key = leftJoinKeys[joinKeyPos]
hasLeftColInProp = true
}
if item.Col.Equal(p.ctx, rightJoinKeys[joinKeyPos]) {
key = rightJoinKeys[joinKeyPos]
hasRightColInProp = true
}
if key == nil {
continue
Expand All @@ -314,6 +316,13 @@ func (p *LogicalJoin) getEnforcedMergeJoin(prop *property.PhysicalProperty, sche
if !isExist {
return nil
}
// If the output wants the order of the inner side. We should reject it since we might add null-extend rows of that side.
if p.JoinType == LeftOuterJoin && hasRightColInProp {
return nil
}
if p.JoinType == RightOuterJoin && hasLeftColInProp {
return nil
}
}
// Generate the enforced sort merge join
leftKeys := getNewJoinKeysByOffsets(leftJoinKeys, offsets)
Expand Down
317 changes: 317 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5226,3 +5226,320 @@ func (s *testIntegrationSuite) TestIssue31202(c *C) {
"└─TableFullScan 10000.00 cop[tikv] table:t31202 keep order:false, stats:pseudo"))
tk.MustExec("drop table if exists t31202")
}
<<<<<<< HEAD
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need resolve conflicts

=======

func TestNaturalJoinUpdateSameTable(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)

tk.MustExec("create database natural_join_update")
defer tk.MustExec("drop database natural_join_update")
tk.MustExec("use natural_join_update")
tk.MustExec("create table t1(a int, b int)")
tk.MustExec("insert into t1 values (1,1),(2,2)")
tk.MustExec("update t1 as a natural join t1 b SET a.a = 2, b.b = 3")
tk.MustQuery("select * from t1").Sort().Check(testkit.Rows("2 3", "2 3"))
tk.MustExec("drop table t1")
tk.MustExec("create table t1 (a int primary key, b int)")
tk.MustExec("insert into t1 values (1,1),(2,2)")
tk.MustGetErrCode(`update t1 as a natural join t1 b SET a.a = 2, b.b = 3`, mysql.ErrMultiUpdateKeyConflict)
tk.MustExec("drop table t1")
tk.MustExec("create table t1 (a int, b int) partition by hash (a) partitions 3")
tk.MustExec("insert into t1 values (1,1),(2,2)")
tk.MustGetErrCode(`update t1 as a natural join t1 b SET a.a = 2, b.b = 3`, mysql.ErrMultiUpdateKeyConflict)
tk.MustExec("drop table t1")
tk.MustExec("create table t1 (A int, b int) partition by hash (b) partitions 3")
tk.MustExec("insert into t1 values (1,1),(2,2)")
tk.MustGetErrCode(`update t1 as a natural join t1 B SET a.A = 2, b.b = 3`, mysql.ErrMultiUpdateKeyConflict)
_, err := tk.Exec(`update t1 as a natural join t1 B SET a.A = 2, b.b = 3`)
require.Error(t, err)
require.Regexp(t, ".planner:1706.Primary key/partition key update is not allowed since the table is updated both as 'a' and 'B'.", err.Error())
tk.MustExec("drop table t1")
tk.MustExec("create table t1 (A int, b int) partition by RANGE COLUMNS (b) (partition `pNeg` values less than (0),partition `pPos` values less than MAXVALUE)")
tk.MustExec("insert into t1 values (1,1),(2,2)")
tk.MustGetErrCode(`update t1 as a natural join t1 B SET a.A = 2, b.b = 3`, mysql.ErrMultiUpdateKeyConflict)
tk.MustExec("drop table t1")
}

func TestAggPushToCopForCachedTable(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec(`create table t32157(
process_code varchar(8) NOT NULL,
ctrl_class varchar(2) NOT NULL,
ctrl_type varchar(1) NOT NULL,
oper_no varchar(12) DEFAULT NULL,
modify_date datetime DEFAULT NULL,
d_c_flag varchar(2) NOT NULL,
PRIMARY KEY (process_code,ctrl_class,d_c_flag));`)
tk.MustExec("insert into t32157 values ('GDEP0071', '05', '1', '10000', '2016-06-29 00:00:00', 'C')")
tk.MustExec("insert into t32157 values ('GDEP0071', '05', '0', '0000', '2016-06-01 00:00:00', 'D')")
tk.MustExec("alter table t32157 cache")

tk.MustQuery("explain format = 'brief' select /*+AGG_TO_COP()*/ count(*) from t32157 ignore index(primary) where process_code = 'GDEP0071'").Check(testkit.Rows(
"StreamAgg 1.00 root funcs:count(1)->Column#8]\n" +
"[└─UnionScan 10.00 root eq(test.t32157.process_code, \"GDEP0071\")]\n" +
"[ └─TableReader 10.00 root data:Selection]\n" +
"[ └─Selection 10.00 cop[tikv] eq(test.t32157.process_code, \"GDEP0071\")]\n" +
"[ └─TableFullScan 10000.00 cop[tikv] table:t32157 keep order:false, stats:pseudo"))

var readFromCacheNoPanic bool
for i := 0; i < 10; i++ {
tk.MustQuery("select /*+AGG_TO_COP()*/ count(*) from t32157 ignore index(primary) where process_code = 'GDEP0071'").Check(testkit.Rows("2"))
if tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache {
readFromCacheNoPanic = true
break
}
}
require.True(t, readFromCacheNoPanic)

tk.MustExec("drop table if exists t31202")
}

func TestIssue31240(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("create table t31240(a int, b int);")
tk.MustExec("set @@tidb_allow_mpp = 0")

tbl, err := dom.InfoSchema().TableByName(model.CIStr{O: "test", L: "test"}, model.CIStr{O: "t31240", L: "t31240"})
require.NoError(t, err)
// Set the hacked TiFlash replica for explain tests.
tbl.Meta().TiFlashReplica = &model.TiFlashReplicaInfo{Count: 1, Available: true}

var input []string
var output []struct {
SQL string
Plan []string
}
integrationSuiteData := core.GetIntegrationSuiteData()
integrationSuiteData.GetTestCases(t, &input, &output)
for i, tt := range input {
testdata.OnRecord(func() {
output[i].SQL = tt
})
if strings.HasPrefix(tt, "set") {
tk.MustExec(tt)
continue
}
testdata.OnRecord(func() {
output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery(tt).Rows())
})
tk.MustQuery(tt).Check(testkit.Rows(output[i].Plan...))
}
tk.MustExec("drop table if exists t31240")
}

func TestIssue32632(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("CREATE TABLE `partsupp` (" +
" `PS_PARTKEY` bigint(20) NOT NULL," +
"`PS_SUPPKEY` bigint(20) NOT NULL," +
"`PS_AVAILQTY` bigint(20) NOT NULL," +
"`PS_SUPPLYCOST` decimal(15,2) NOT NULL," +
"`PS_COMMENT` varchar(199) NOT NULL," +
"PRIMARY KEY (`PS_PARTKEY`,`PS_SUPPKEY`) /*T![clustered_index] NONCLUSTERED */)")
tk.MustExec("CREATE TABLE `supplier` (" +
"`S_SUPPKEY` bigint(20) NOT NULL," +
"`S_NAME` char(25) NOT NULL," +
"`S_ADDRESS` varchar(40) NOT NULL," +
"`S_NATIONKEY` bigint(20) NOT NULL," +
"`S_PHONE` char(15) NOT NULL," +
"`S_ACCTBAL` decimal(15,2) NOT NULL," +
"`S_COMMENT` varchar(101) NOT NULL," +
"PRIMARY KEY (`S_SUPPKEY`) /*T![clustered_index] CLUSTERED */)")
tk.MustExec("analyze table partsupp;")
tk.MustExec("analyze table supplier;")
tk.MustExec("set @@tidb_enforce_mpp = 1")

tbl1, err := dom.InfoSchema().TableByName(model.CIStr{O: "test", L: "test"}, model.CIStr{O: "partsupp", L: "partsupp"})
require.NoError(t, err)
tbl2, err := dom.InfoSchema().TableByName(model.CIStr{O: "test", L: "test"}, model.CIStr{O: "supplier", L: "supplier"})
require.NoError(t, err)
// Set the hacked TiFlash replica for explain tests.
tbl1.Meta().TiFlashReplica = &model.TiFlashReplicaInfo{Count: 1, Available: true}
tbl2.Meta().TiFlashReplica = &model.TiFlashReplicaInfo{Count: 1, Available: true}

h := dom.StatsHandle()
statsTbl1 := h.GetTableStats(tbl1.Meta())
statsTbl1.Count = 800000
statsTbl2 := h.GetTableStats(tbl2.Meta())
statsTbl2.Count = 10000
var input []string
var output []struct {
SQL string
Plan []string
}
integrationSuiteData := core.GetIntegrationSuiteData()
integrationSuiteData.GetTestCases(t, &input, &output)
for i, tt := range input {
testdata.OnRecord(func() {
output[i].SQL = tt
output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery(tt).Rows())
})
tk.MustQuery(tt).Check(testkit.Rows(output[i].Plan...))
}
tk.MustExec("drop table if exists partsupp")
tk.MustExec("drop table if exists supplier")
}

func TestTiFlashPartitionTableScan(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'")
tk.MustExec("set @@tidb_isolation_read_engines = 'tiflash'")
tk.MustExec("set @@tidb_enforce_mpp = on")
tk.MustExec("set @@tidb_allow_batch_cop = 2")
tk.MustExec("drop table if exists rp_t;")
tk.MustExec("drop table if exists hp_t;")
tk.MustExec("create table rp_t(a int) partition by RANGE (a) (PARTITION p0 VALUES LESS THAN (6),PARTITION p1 VALUES LESS THAN (11), PARTITION p2 VALUES LESS THAN (16), PARTITION p3 VALUES LESS THAN (21));")
tk.MustExec("create table hp_t(a int) partition by hash(a) partitions 4;")
tbl1, err := dom.InfoSchema().TableByName(model.CIStr{O: "test", L: "test"}, model.CIStr{O: "rp_t", L: "rp_t"})
require.NoError(t, err)
tbl2, err := dom.InfoSchema().TableByName(model.CIStr{O: "test", L: "test"}, model.CIStr{O: "hp_t", L: "hp_t"})
require.NoError(t, err)
// Set the hacked TiFlash replica for explain tests.
tbl1.Meta().TiFlashReplica = &model.TiFlashReplicaInfo{Count: 1, Available: true}
tbl2.Meta().TiFlashReplica = &model.TiFlashReplicaInfo{Count: 1, Available: true}
var input []string
var output []struct {
SQL string
Plan []string
}
integrationSuiteData := core.GetIntegrationSuiteData()
integrationSuiteData.GetTestCases(t, &input, &output)
for i, tt := range input {
testdata.OnRecord(func() {
output[i].SQL = tt
output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery(tt).Rows())
})
tk.MustQuery(tt).Check(testkit.Rows(output[i].Plan...))
}
tk.MustExec("drop table rp_t;")
tk.MustExec("drop table hp_t;")
}

func TestIssue33175(t *testing.T) {
store, _, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

tk.MustExec("create table t (id bigint(45) unsigned not null, c varchar(20), primary key(id));")
tk.MustExec("insert into t values (9734095886065816707, 'a'), (10353107668348738101, 'b'), (0, 'c');")
tk.MustExec("begin")
tk.MustExec("insert into t values (33, 'd');")
tk.MustQuery("select max(id) from t;").Check(testkit.Rows("10353107668348738101"))
tk.MustExec("rollback")

tk.MustExec("alter table t cache")
for {
tk.MustQuery("select max(id) from t;").Check(testkit.Rows("10353107668348738101"))
if tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache {
break
}
}

// // With subquery, like the original issue case.
for {
tk.MustQuery("select * from t where id > (select max(id) from t where t.id > 0);").Check(testkit.Rows())
if tk.Session().GetSessionVars().StmtCtx.ReadFromTableCache {
break
}
}

// Test order by desc / asc.
tk.MustQuery("select id from t order by id desc;").Check(testkit.Rows(
"10353107668348738101",
"9734095886065816707",
"0"))

tk.MustQuery("select id from t order by id asc;").Check(testkit.Rows(
"0",
"9734095886065816707",
"10353107668348738101"))

tk.MustExec("alter table t nocache")
tk.MustExec("drop table t")

// Cover more code that use union scan
// TableReader/IndexReader/IndexLookup
for idx, q := range []string{
"create temporary table t (id bigint unsigned, c int default null, index(id))",
"create temporary table t (id bigint unsigned primary key)",
} {
tk.MustExec(q)
tk.MustExec("insert into t(id) values (1), (3), (9734095886065816707), (9734095886065816708)")
tk.MustQuery("select min(id) from t").Check(testkit.Rows("1"))
tk.MustQuery("select max(id) from t").Check(testkit.Rows("9734095886065816708"))
tk.MustQuery("select id from t order by id asc").Check(testkit.Rows(
"1", "3", "9734095886065816707", "9734095886065816708"))
tk.MustQuery("select id from t order by id desc").Check(testkit.Rows(
"9734095886065816708", "9734095886065816707", "3", "1"))
if idx == 0 {
tk.MustQuery("select * from t order by id asc").Check(testkit.Rows(
"1 <nil>",
"3 <nil>",
"9734095886065816707 <nil>",
"9734095886065816708 <nil>"))
tk.MustQuery("select * from t order by id desc").Check(testkit.Rows(
"9734095886065816708 <nil>",
"9734095886065816707 <nil>",
"3 <nil>",
"1 <nil>"))
}
tk.MustExec("drop table t")
}

// More and more test
tk.MustExec("create global temporary table `tmp1` (id bigint unsigned primary key) on commit delete rows;")
tk.MustExec("begin")
tk.MustExec("insert into tmp1 values (0),(1),(2),(65536),(9734095886065816707),(9734095886065816708);")
tk.MustQuery("select * from tmp1 where id <= 65534 or (id > 65535 and id < 9734095886065816700) or id >= 9734095886065816707 order by id desc;").Check(testkit.Rows(
"9734095886065816708", "9734095886065816707", "65536", "2", "1", "0"))

tk.MustQuery("select * from tmp1 where id <= 65534 or (id > 65535 and id < 9734095886065816700) or id >= 9734095886065816707 order by id asc;").Check(testkit.Rows(
"0", "1", "2", "65536", "9734095886065816707", "9734095886065816708"))

tk.MustExec("create global temporary table `tmp2` (id bigint primary key) on commit delete rows;")
tk.MustExec("begin")
tk.MustExec("insert into tmp2 values(-2),(-1),(0),(1),(2);")
tk.MustQuery("select * from tmp2 where id <= -1 or id > 0 order by id desc;").Check(testkit.Rows("2", "1", "-1", "-2"))
tk.MustQuery("select * from tmp2 where id <= -1 or id > 0 order by id asc;").Check(testkit.Rows("-2", "-1", "1", "2"))
}

func TestIssue33042(t *testing.T) {
store, _, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("create table t1(id int primary key, col1 int)")
tk.MustExec("create table t2(id int primary key, col1 int)")
tk.MustQuery("explain format='brief' SELECT /*+ merge_join(t1, t2)*/ * FROM (t1 LEFT JOIN t2 ON t1.col1=t2.id) order by t2.id;").Check(
testkit.Rows(
"Sort 12500.00 root test.t2.id",
"└─MergeJoin 12500.00 root left outer join, left key:test.t1.col1, right key:test.t2.id",
" ├─TableReader(Build) 10000.00 root data:TableFullScan",
" │ └─TableFullScan 10000.00 cop[tikv] table:t2 keep order:true, stats:pseudo",
" └─Sort(Probe) 10000.00 root test.t1.col1",
" └─TableReader 10000.00 root data:TableFullScan",
" └─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo",
),
)
}
>>>>>>> 1287eab59... planner: outer merge join cannot keep the prop of its inner child (#33359)
4 changes: 2 additions & 2 deletions planner/core/testdata/integration_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -1332,7 +1332,7 @@
"└─IndexRangeScan 20.00 cop[tikv] table:tt, index:a(a) range:[10,10], [20,20], keep order:false, stats:pseudo"
],
"Warnings": [
"Warning 1105 IndexMerge is inapplicable."
"Warning 1105 IndexMerge is inapplicable"
]
},
{
Expand All @@ -1342,7 +1342,7 @@
"└─IndexRangeScan 6666.67 cop[tikv] table:tt, index:a(a) range:[-inf,10), [15,15], (20,+inf], keep order:false, stats:pseudo"
],
"Warnings": [
"Warning 1105 IndexMerge is inapplicable."
"Warning 1105 IndexMerge is inapplicable"
]
}
]
Expand Down
2 changes: 1 addition & 1 deletion planner/core/testdata/plan_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@
},
{
"SQL": "select /*+ TIDB_SMJ(t1,t2,t3)*/ * from t t1 left outer join t t2 on t1.a = t2.a left outer join t t3 on t2.a = t3.a",
"Best": "MergeLeftOuterJoin{MergeLeftOuterJoin{TableReader(Table(t))->TableReader(Table(t))}(test.t.a,test.t.a)->TableReader(Table(t))}(test.t.a,test.t.a)"
"Best": "MergeLeftOuterJoin{MergeLeftOuterJoin{TableReader(Table(t))->TableReader(Table(t))}(test.t.a,test.t.a)->Sort->TableReader(Table(t))}(test.t.a,test.t.a)"
},
{
"SQL": "select /*+ TIDB_SMJ(t1,t2,t3)*/ * from t t1 left outer join t t2 on t1.a = t2.a left outer join t t3 on t1.a = t3.a",
Expand Down