Skip to content

Commit

Permalink
*: push part of order prop to partitiontable and make auto-analyze co…
Browse files Browse the repository at this point in the history
…nfigurable (#37420)

ref #36108
  • Loading branch information
winoros authored Aug 29, 2022
1 parent 1a89dec commit 7c312d0
Show file tree
Hide file tree
Showing 19 changed files with 449 additions and 61 deletions.
4 changes: 2 additions & 2 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1341,7 +1341,7 @@ func TestAlterTableDropPartitionByList(t *testing.T) {
);`)
tk.MustExec(`insert into t values (1),(3),(5),(null)`)
tk.MustExec(`alter table t drop partition p1`)
tk.MustQuery("select * from t").Check(testkit.Rows("1", "5", "<nil>"))
tk.MustQuery("select * from t order by id").Check(testkit.Rows("<nil>", "1", "5"))
ctx := tk.Session()
is := domain.GetDomain(ctx).InfoSchema()
tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
Expand Down Expand Up @@ -1867,7 +1867,7 @@ func TestAlterTableExchangePartition(t *testing.T) {
// test disable exchange partition
tk.MustExec("ALTER TABLE e EXCHANGE PARTITION p0 WITH TABLE e2")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 8200 Exchange Partition is disabled, please set 'tidb_enable_exchange_partition' if you need to need to enable it"))
tk.MustQuery("select * from e").Check(testkit.Rows("16", "1669", "337", "2005"))
tk.MustQuery("select * from e order by id").Check(testkit.Rows("16", "337", "1669", "2005"))
tk.MustQuery("select * from e2").Check(testkit.Rows())

// enable exchange partition
Expand Down
16 changes: 16 additions & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@ func (builder *RequestBuilder) SetKeyRanges(keyRanges []kv.KeyRange) *RequestBui
return builder
}

// SetPartitionKeyRanges sets the "KeyRangesWithPartition" for "kv.Request".
func (builder *RequestBuilder) SetPartitionKeyRanges(keyRanges [][]kv.KeyRange) *RequestBuilder {
builder.Request.KeyRangesWithPartition = keyRanges
return builder
}

// SetStartTS sets "StartTS" for "kv.Request".
func (builder *RequestBuilder) SetStartTS(startTS uint64) *RequestBuilder {
builder.Request.StartTs = startTS
Expand Down Expand Up @@ -332,6 +338,16 @@ func (builder *RequestBuilder) verifyTxnScope() error {
return errors.New("requestBuilder can't decode tableID from keyRange")
}
}
for _, partKeyRanges := range builder.Request.KeyRangesWithPartition {
for _, keyRange := range partKeyRanges {
tableID := tablecodec.DecodeTableID(keyRange.StartKey)
if tableID > 0 {
visitPhysicalTableID[tableID] = struct{}{}
} else {
return errors.New("requestBuilder can't decode tableID from keyRange")
}
}
}

for phyTableID := range visitPhysicalTableID {
valid := VerifyTxnScope(builder.ReadReplicaScope, phyTableID, builder.is)
Expand Down
15 changes: 13 additions & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"math"
"runtime/trace"
"strconv"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -394,8 +395,18 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
oriScan := sctx.GetSessionVars().DistSQLScanConcurrency()
oriIndex := sctx.GetSessionVars().IndexSerialScanConcurrency()
oriIso, _ := sctx.GetSessionVars().GetSystemVar(variable.TxnIsolation)
terror.Log(sctx.GetSessionVars().SetSystemVar(variable.TiDBBuildStatsConcurrency, "1"))
sctx.GetSessionVars().SetDistSQLScanConcurrency(1)
autoConcurrency, err1 := variable.GetSessionOrGlobalSystemVar(sctx.GetSessionVars(), variable.TiDBAutoBuildStatsConcurrency)
terror.Log(err1)
if err1 == nil {
terror.Log(sctx.GetSessionVars().SetSystemVar(variable.TiDBBuildStatsConcurrency, autoConcurrency))
}
sVal, err2 := variable.GetSessionOrGlobalSystemVar(sctx.GetSessionVars(), variable.TiDBSysProcScanConcurrency)
terror.Log(err2)
if err2 == nil {
concurrency, err3 := strconv.ParseInt(sVal, 10, 64)
terror.Log(err3)
sctx.GetSessionVars().SetDistSQLScanConcurrency(int(concurrency))
}
sctx.GetSessionVars().SetIndexSerialScanConcurrency(1)
terror.Log(sctx.GetSessionVars().SetSystemVar(variable.TxnIsolation, ast.ReadCommitted))
defer func() {
Expand Down
8 changes: 4 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4118,16 +4118,16 @@ func (h kvRangeBuilderFromRangeAndPartition) buildKeyRangeSeparately(ranges []*r
return pids, ret, nil
}

func (h kvRangeBuilderFromRangeAndPartition) buildKeyRange(ranges []*ranger.Range) ([]kv.KeyRange, error) {
var ret []kv.KeyRange
for _, p := range h.partitions {
func (h kvRangeBuilderFromRangeAndPartition) buildKeyRange(ranges []*ranger.Range) ([][]kv.KeyRange, error) {
ret := make([][]kv.KeyRange, len(h.partitions))
for i, p := range h.partitions {
pid := p.GetPhysicalID()
meta := p.Meta()
kvRange, err := distsql.TableHandleRangesToKVRanges(h.sctx.GetSessionVars().StmtCtx, []int64{pid}, meta != nil && meta.IsCommonHandle, ranges, nil)
if err != nil {
return nil, err
}
ret = append(ret, kvRange...)
ret[i] = append(ret[i], kvRange...)
}
return ret, nil
}
Expand Down
3 changes: 0 additions & 3 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,9 +449,6 @@ func (e *IndexLookUpExecutor) Open(ctx context.Context) error {
func (e *IndexLookUpExecutor) buildTableKeyRanges() (err error) {
sc := e.ctx.GetSessionVars().StmtCtx
if e.partitionTableMode {
if e.keepOrder { // this case should be prevented by the optimizer
return errors.New("invalid execution plan: cannot keep order when accessing a partition table by IndexLookUpReader")
}
e.feedback.Invalidate() // feedback for partition tables is not ready
e.partitionKVRanges = make([][]kv.KeyRange, 0, len(e.prunedPartitions))
for _, p := range e.prunedPartitions {
Expand Down
143 changes: 141 additions & 2 deletions executor/partition_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ partition p2 values less than (10))`)
// Table reader: one partition
tk.MustQuery("select * from pt where c > 8").Check(testkit.Rows("9 9"))
// Table reader: more than one partition
tk.MustQuery("select * from pt where c < 2 or c >= 9").Check(testkit.Rows("0 0", "9 9"))
tk.MustQuery("select * from pt where c < 2 or c >= 9").Sort().Check(testkit.Rows("0 0", "9 9"))

// Index reader
tk.MustQuery("select c from pt").Sort().Check(testkit.Rows("0", "2", "4", "6", "7", "9", "<nil>"))
Expand All @@ -64,7 +64,7 @@ partition p2 values less than (10))`)
tk.MustQuery("select /*+ use_index(pt, i_id) */ * from pt").Sort().Check(testkit.Rows("0 0", "2 2", "4 4", "6 6", "7 7", "9 9", "<nil> <nil>"))
tk.MustQuery("select /*+ use_index(pt, i_id) */ * from pt where id < 4 and c > 10").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index(pt, i_id) */ * from pt where id < 10 and c > 8").Check(testkit.Rows("9 9"))
tk.MustQuery("select /*+ use_index(pt, i_id) */ * from pt where id < 10 and c < 2 or c >= 9").Check(testkit.Rows("0 0", "9 9"))
tk.MustQuery("select /*+ use_index(pt, i_id) */ * from pt where id < 10 and c < 2 or c >= 9").Sort().Check(testkit.Rows("0 0", "9 9"))

// Index Merge
tk.MustExec("set @@tidb_enable_index_merge = 1")
Expand Down Expand Up @@ -352,14 +352,67 @@ func TestOrderByandLimit(t *testing.T) {
// regular table
tk.MustExec("create table tregular(a int, b int, index idx_a(a))")

// range partition table with int pk
tk.MustExec(`create table trange_intpk(a int primary key, b int) partition by range(a) (
partition p0 values less than(300),
partition p1 values less than (500),
partition p2 values less than(1100));`)

// hash partition table with int pk
tk.MustExec("create table thash_intpk(a int primary key, b int) partition by hash(a) partitions 4;")

// regular table with int pk
tk.MustExec("create table tregular_intpk(a int primary key, b int)")

// range partition table with clustered index
tk.MustExec(`create table trange_clustered(a int, b int, primary key(a, b) clustered) partition by range(a) (
partition p0 values less than(300),
partition p1 values less than (500),
partition p2 values less than(1100));`)

// hash partition table with clustered index
tk.MustExec("create table thash_clustered(a int, b int, primary key(a, b) clustered) partition by hash(a) partitions 4;")

// regular table with clustered index
tk.MustExec("create table tregular_clustered(a int, b int, primary key(a, b) clustered)")

// generate some random data to be inserted
vals := make([]string, 0, 2000)
for i := 0; i < 2000; i++ {
vals = append(vals, fmt.Sprintf("(%v, %v)", rand.Intn(1100), rand.Intn(2000)))
}

dedupValsA := make([]string, 0, 2000)
dedupMapA := make(map[int]struct{}, 2000)
for i := 0; i < 2000; i++ {
valA := rand.Intn(1100)
if _, ok := dedupMapA[valA]; ok {
continue
}
dedupValsA = append(dedupValsA, fmt.Sprintf("(%v, %v)", valA, rand.Intn(2000)))
dedupMapA[valA] = struct{}{}
}

dedupValsAB := make([]string, 0, 2000)
dedupMapAB := make(map[string]struct{}, 2000)
for i := 0; i < 2000; i++ {
val := fmt.Sprintf("(%v, %v)", rand.Intn(1100), rand.Intn(2000))
if _, ok := dedupMapAB[val]; ok {
continue
}
dedupValsAB = append(dedupValsAB, val)
dedupMapAB[val] = struct{}{}
}

tk.MustExec("insert into trange values " + strings.Join(vals, ","))
tk.MustExec("insert into thash values " + strings.Join(vals, ","))
tk.MustExec("insert into tregular values " + strings.Join(vals, ","))
tk.MustExec("insert into trange_intpk values " + strings.Join(dedupValsA, ","))
tk.MustExec("insert into thash_intpk values " + strings.Join(dedupValsA, ","))
tk.MustExec("insert into tregular_intpk values " + strings.Join(dedupValsA, ","))
tk.MustExec("insert into trange_clustered values " + strings.Join(dedupValsAB, ","))
tk.MustExec("insert into thash_clustered values " + strings.Join(dedupValsAB, ","))
tk.MustExec("insert into tregular_clustered values " + strings.Join(dedupValsAB, ","))

// test indexLookUp
for i := 0; i < 100; i++ {
Expand All @@ -373,6 +426,29 @@ func TestOrderByandLimit(t *testing.T) {
tk.MustQuery(queryPartition).Sort().Check(tk.MustQuery(queryRegular).Sort().Rows())
}

// test indexLookUp with order property pushed down.
for i := 0; i < 100; i++ {
// explain select * from t where a > {y} use index(idx_a) order by a limit {x}; // check if IndexLookUp is used
// select * from t where a > {y} use index(idx_a) order by a limit {x}; // it can return the correct result
x := rand.Intn(1099)
y := rand.Intn(2000) + 1
// Since we only use order by a not order by a, b, the result is not stable when we read both a and b.
// We cut the max element so that the result can be stable.
maxEle := tk.MustQuery(fmt.Sprintf("select ifnull(max(a), 1100) from (select * from tregular use index(idx_a) where a > %v order by a limit %v) t", x, y)).Rows()[0][0]
queryRangePartitionWithLimitHint := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from trange use index(idx_a) where a > %v and a < greatest(%v+1, %v) order by a limit %v", x, x+1, maxEle, y)
queryHashPartitionWithLimitHint := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from thash use index(idx_a) where a > %v and a < greatest(%v+1, %v) order by a limit %v", x, x+1, maxEle, y)
queryRegular := fmt.Sprintf("select * from tregular use index(idx_a) where a > %v and a < greatest(%v+1, %v) order by a limit %v;", x, x+1, maxEle, y)
require.True(t, tk.HasPlan(queryRangePartitionWithLimitHint, "Limit"))
require.True(t, tk.HasPlan(queryRangePartitionWithLimitHint, "IndexLookUp"))
require.True(t, tk.HasPlan(queryHashPartitionWithLimitHint, "Limit"))
require.True(t, tk.HasPlan(queryHashPartitionWithLimitHint, "IndexLookUp"))
require.True(t, tk.HasPlan(queryRangePartitionWithLimitHint, "TopN")) // but not fully pushed
require.True(t, tk.HasPlan(queryHashPartitionWithLimitHint, "TopN"))
regularResult := tk.MustQuery(queryRegular).Sort().Rows()
tk.MustQuery(queryRangePartitionWithLimitHint).Sort().Check(regularResult)
tk.MustQuery(queryHashPartitionWithLimitHint).Sort().Check(regularResult)
}

// test tableReader
for i := 0; i < 100; i++ {
// explain select * from t where a > {y} ignore index(idx_a) order by a limit {x}; // check if IndexLookUp is used
Expand All @@ -385,6 +461,51 @@ func TestOrderByandLimit(t *testing.T) {
tk.MustQuery(queryPartition).Sort().Check(tk.MustQuery(queryRegular).Sort().Rows())
}

// test tableReader with order property pushed down.
for i := 0; i < 100; i++ {
// explain select * from t where a > {y} ignore index(idx_a) order by a limit {x}; // check if IndexLookUp is used
// select * from t where a > {y} ignore index(idx_a) order by a limit {x}; // it can return the correct result
x := rand.Intn(1099)
y := rand.Intn(2000) + 1
queryRangePartition := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from trange ignore index(idx_a) where a > %v order by a, b limit %v;", x, y)
queryHashPartition := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from thash ignore index(idx_a) where a > %v order by a, b limit %v;", x, y)
queryRegular := fmt.Sprintf("select * from tregular ignore index(idx_a) where a > %v order by a, b limit %v;", x, y)
require.True(t, tk.HasPlan(queryRangePartition, "TableReader")) // check if tableReader is used
require.True(t, tk.HasPlan(queryHashPartition, "TableReader"))
require.False(t, tk.HasPlan(queryRangePartition, "Limit")) // check if order property is not pushed
require.False(t, tk.HasPlan(queryHashPartition, "Limit"))
regularResult := tk.MustQuery(queryRegular).Sort().Rows()
tk.MustQuery(queryRangePartition).Sort().Check(regularResult)
tk.MustQuery(queryHashPartition).Sort().Check(regularResult)

// test int pk
// To be simplified, we only read column a.
queryRangePartition = fmt.Sprintf("select /*+ LIMIT_TO_COP() */ a from trange_intpk use index(primary) where a > %v order by a limit %v", x, y)
queryHashPartition = fmt.Sprintf("select /*+ LIMIT_TO_COP() */ a from thash_intpk use index(primary) where a > %v order by a limit %v", x, y)
queryRegular = fmt.Sprintf("select a from tregular_intpk where a > %v order by a limit %v", x, y)
require.True(t, tk.HasPlan(queryRangePartition, "TableReader"))
require.True(t, tk.HasPlan(queryHashPartition, "TableReader"))
require.True(t, tk.HasPlan(queryRangePartition, "Limit")) // check if order property is not pushed
require.True(t, tk.HasPlan(queryHashPartition, "Limit"))
regularResult = tk.MustQuery(queryRegular).Rows()
tk.MustQuery(queryRangePartition).Check(regularResult)
tk.MustQuery(queryHashPartition).Check(regularResult)

// test clustered index
queryRangePartition = fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from trange_clustered use index(primary) where a > %v order by a, b limit %v;", x, y)
queryHashPartition = fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from thash_clustered use index(primary) where a > %v order by a, b limit %v;", x, y)
queryRegular = fmt.Sprintf("select * from tregular_clustered where a > %v order by a, b limit %v;", x, y)
require.True(t, tk.HasPlan(queryRangePartition, "TableReader")) // check if tableReader is used
require.True(t, tk.HasPlan(queryHashPartition, "TableReader"))
require.True(t, tk.HasPlan(queryRangePartition, "Limit")) // check if order property is pushed
require.True(t, tk.HasPlan(queryHashPartition, "Limit"))
require.True(t, tk.HasPlan(queryRangePartition, "TopN")) // but not fully pushed
require.True(t, tk.HasPlan(queryHashPartition, "TopN"))
regularResult = tk.MustQuery(queryRegular).Rows()
tk.MustQuery(queryRangePartition).Check(regularResult)
tk.MustQuery(queryHashPartition).Check(regularResult)
}

// test indexReader
for i := 0; i < 100; i++ {
// explain select a from t where a > {y} use index(idx_a) order by a limit {x}; // check if IndexLookUp is used
Expand All @@ -397,6 +518,24 @@ func TestOrderByandLimit(t *testing.T) {
tk.MustQuery(queryPartition).Sort().Check(tk.MustQuery(queryRegular).Sort().Rows())
}

// test indexReader with order property pushed down.
for i := 0; i < 100; i++ {
// explain select a from t where a > {y} use index(idx_a) order by a limit {x}; // check if IndexLookUp is used
// select a from t where a > {y} use index(idx_a) order by a limit {x}; // it can return the correct result
x := rand.Intn(1099)
y := rand.Intn(2000) + 1
queryRangePartition := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ a from trange use index(idx_a) where a > %v order by a limit %v;", x, y)
queryHashPartition := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ a from trange use index(idx_a) where a > %v order by a limit %v;", x, y)
queryRegular := fmt.Sprintf("select a from tregular use index(idx_a) where a > %v order by a limit %v;", x, y)
require.True(t, tk.HasPlan(queryRangePartition, "IndexReader")) // check if indexReader is used
require.True(t, tk.HasPlan(queryHashPartition, "IndexReader"))
require.True(t, tk.HasPlan(queryRangePartition, "Limit")) // check if order property is pushed
require.True(t, tk.HasPlan(queryHashPartition, "Limit"))
regularResult := tk.MustQuery(queryRegular).Sort().Rows()
tk.MustQuery(queryRangePartition).Sort().Check(regularResult)
tk.MustQuery(queryHashPartition).Sort().Check(regularResult)
}

// test indexMerge
for i := 0; i < 100; i++ {
// explain select /*+ use_index_merge(t) */ * from t where a > 2 or b < 5 order by a limit {x}; // check if IndexMerge is used
Expand Down
29 changes: 24 additions & 5 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (sr selectResultHook) SelectResult(ctx context.Context, sctx sessionctx.Con
}

type kvRangeBuilder interface {
buildKeyRange(ranges []*ranger.Range) ([]kv.KeyRange, error)
buildKeyRange(ranges []*ranger.Range) ([][]kv.KeyRange, error)
buildKeyRangeSeparately(ranges []*ranger.Range) ([]int64, [][]kv.KeyRange, error)
}

Expand Down Expand Up @@ -196,13 +196,25 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error {
if err != nil {
return err
}
e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...)
if len(kvReq.KeyRanges) > 0 {
e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...)
} else {
for _, kr := range kvReq.KeyRangesWithPartition {
e.kvRanges = append(e.kvRanges, kr...)
}
}
if len(secondPartRanges) != 0 {
kvReq, err = e.buildKVReq(ctx, secondPartRanges)
if err != nil {
return err
}
e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...)
if len(kvReq.KeyRanges) > 0 {
e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...)
} else {
for _, kr := range kvReq.KeyRangesWithPartition {
e.kvRanges = append(e.kvRanges, kr...)
}
}
}
return nil
}
Expand Down Expand Up @@ -305,7 +317,14 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra
if err != nil {
return nil, err
}
e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...)

if len(kvReq.KeyRanges) > 0 {
e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...)
} else {
for _, kr := range kvReq.KeyRangesWithPartition {
e.kvRanges = append(e.kvRanges, kr...)
}
}

result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id)
if err != nil {
Expand Down Expand Up @@ -391,7 +410,7 @@ func (e *TableReaderExecutor) buildKVReq(ctx context.Context, ranges []*ranger.R
if err != nil {
return nil, err
}
reqBuilder = builder.SetKeyRanges(kvRange)
reqBuilder = builder.SetPartitionKeyRanges(kvRange)
} else {
reqBuilder = builder.SetHandleRanges(e.ctx.GetSessionVars().StmtCtx, getPhysicalTableID(e.table), e.table.Meta() != nil && e.table.Meta().IsCommonHandle, ranges, e.feedback)
}
Expand Down
4 changes: 4 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,10 @@ type Request struct {
Data []byte
KeyRanges []KeyRange

// KeyRangesWithPartition makes sure that the request is sent first by partition then by region.
// When the table is small, it's possible that multiple partitions are in the same region.
KeyRangesWithPartition [][]KeyRange

// For PartitionTableScan used by tiflash.
PartitionIDAndRanges []PartitionIDAndRanges

Expand Down
2 changes: 1 addition & 1 deletion planner/cascades/testdata/integration_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@
{
"name": "TestCascadePlannerHashedPartTable",
"cases": [
"select * from pt1"
"select * from pt1 order by a"
]
},
{
Expand Down
Loading

0 comments on commit 7c312d0

Please sign in to comment.