diff --git a/executor/builder.go b/executor/builder.go index e431e4b358fc6..728a0e624d31e 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -4071,6 +4071,9 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd isCorColInTableFilter: isCorColInTableFilter, isCorColInPartialAccess: isCorColInPartialAccess, isIntersection: v.IsIntersectionType, + byItems: v.ByItems, + pushedLimit: v.PushedLimit, + keepOrder: v.KeepOrder, } collectTable := false e.tableRequest.CollectRangeCounts = &collectTable diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index c0142dc0cab88..5dfd579f45d0f 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -16,9 +16,11 @@ package executor import ( "bytes" + "container/heap" "context" "fmt" "runtime/trace" + "sort" "sync" "sync/atomic" "time" @@ -27,13 +29,16 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/distsql" + "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/terror" plannercore "github.com/pingcap/tidb/planner/core" + plannerutil "github.com/pingcap/tidb/planner/util" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/execdetails" @@ -83,6 +88,11 @@ type IndexMergeReaderExecutor struct { dagPBs []*tipb.DAGRequest startTS uint64 tableRequest *tipb.DAGRequest + + keepOrder bool + pushedLimit *plannercore.PushedDownLimit + byItems []*plannerutil.ByItems + // columns are only required by union scan. columns []*model.ColumnInfo *dataReaderBuilder @@ -136,6 +146,9 @@ type indexMergeTableTask struct { // parTblIdx are only used in indexMergeProcessWorker.fetchLoopIntersection. parTblIdx int + + // partialPlanID are only used for indexMergeProcessWorker.fetchLoopUnionWithOrderByAndPushedLimit. + partialPlanID int } // Table implements the dataSourceExecutor interface. @@ -281,6 +294,8 @@ func (e *IndexMergeReaderExecutor) startIndexMergeProcessWorker(ctx context.Cont func() { if e.isIntersection { idxMergeProcessWorker.fetchLoopIntersection(ctx, fetch, workCh, e.resultCh, e.finished) + } else if !e.partitionTableMode && e.pushedLimit != nil && len(e.byItems) != 0 { + idxMergeProcessWorker.fetchLoopUnionWithOrderByAndPushedLimit(ctx, fetch, workCh, e.resultCh, e.finished) } else { idxMergeProcessWorker.fetchLoopUnion(ctx, fetch, workCh, e.resultCh, e.finished) } @@ -340,6 +355,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, maxBatchSize: e.ctx.GetSessionVars().IndexLookupSize, maxChunkSize: e.maxChunkSize, memTracker: e.memTracker, + byItems: e.byItems, } if e.isCorColInPartialFilters[workID] { @@ -393,7 +409,8 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, syncErr(ctx, e.finished, fetchCh, err) return } - result, err := distsql.SelectWithRuntimeStats(ctx, e.ctx, kvReq, e.handleCols.GetFieldsTypes(), e.feedbacks[workID], getPhysicalPlanIDs(e.partialPlans[workID]), e.getPartitalPlanID(workID)) + result, err := distsql.SelectWithRuntimeStats(ctx, e.ctx, kvReq, worker.getRetTpsForIndexScan(e.handleCols), + e.feedbacks[workID], getPhysicalPlanIDs(e.partialPlans[workID]), e.getPartitalPlanID(workID)) if err != nil { syncErr(ctx, e.finished, fetchCh, err) return @@ -410,7 +427,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, // fetch all data from this partition ctx1, cancel := context.WithCancel(ctx) - _, fetchErr := worker.fetchHandles(ctx1, result, exitCh, fetchCh, e.finished, e.handleCols, parTblIdx) + _, fetchErr := worker.fetchHandles(ctx1, result, exitCh, fetchCh, e.finished, e.handleCols, parTblIdx, workID) if fetchErr != nil { // this error is synced in fetchHandles(), don't sync it again e.feedbacks[workID].Invalidate() } @@ -472,6 +489,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, plans: e.partialPlans[workID], ranges: e.ranges[workID], netDataSize: e.partialNetDataSizes[workID], + keepOrder: ts.KeepOrder, } worker := &partialTableWorker{ @@ -482,6 +500,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, maxChunkSize: e.maxChunkSize, tableReader: partialTableReader, memTracker: e.memTracker, + byItems: e.byItems, } if e.isCorColInPartialFilters[workID] { @@ -528,7 +547,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, // fetch all handles from this table ctx1, cancel := context.WithCancel(ctx) - _, fetchErr := worker.fetchHandles(ctx1, exitCh, fetchCh, e.finished, e.handleCols, parTblIdx) + _, fetchErr := worker.fetchHandles(ctx1, exitCh, fetchCh, e.finished, e.handleCols, parTblIdx, workID) if fetchErr != nil { // this error is synced in fetchHandles, so don't sync it again e.feedbacks[workID].Invalidate() } @@ -582,11 +601,12 @@ type partialTableWorker struct { tableReader Executor partition table.PhysicalTable // it indicates if this worker is accessing a particular partition table memTracker *memory.Tracker + byItems []*plannerutil.ByItems } func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *indexMergeTableTask, - finished <-chan struct{}, handleCols plannercore.HandleCols, parTblIdx int) (count int64, err error) { - chk := w.sc.GetSessionVars().GetNewChunkWithCapacity(retTypes(w.tableReader), w.maxChunkSize, w.maxChunkSize, w.tableReader.base().AllocPool) + finished <-chan struct{}, handleCols plannercore.HandleCols, parTblIdx int, partialPlanIndex int) (count int64, err error) { + chk := w.sc.GetSessionVars().GetNewChunkWithCapacity(w.getRetTpsForTableScan(), w.maxChunkSize, w.maxChunkSize, w.tableReader.base().AllocPool) for { start := time.Now() handles, retChunk, err := w.extractTaskHandles(ctx, chk, handleCols) @@ -598,7 +618,7 @@ func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan str return count, nil } count += int64(len(handles)) - task := w.buildTableTask(handles, retChunk, parTblIdx) + task := w.buildTableTask(handles, retChunk, parTblIdx, partialPlanIndex) if w.stats != nil { atomic.AddInt64(&w.stats.FetchIdxTime, int64(time.Since(start))) } @@ -614,6 +634,10 @@ func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan str } } +func (w *partialTableWorker) getRetTpsForTableScan() []*types.FieldType { + return retTypes(w.tableReader) +} + func (w *partialTableWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, handleCols plannercore.HandleCols) ( handles []kv.Handle, retChk *chunk.Chunk, err error) { handles = make([]kv.Handle, 0, w.batchSize) @@ -645,6 +669,13 @@ func (w *partialTableWorker) extractTaskHandles(ctx context.Context, chk *chunk. } handles = append(handles, handle) } + // used for limit embedded. + if len(w.byItems) != 0 { + if retChk == nil { + retChk = chunk.NewChunkWithCapacity(w.getRetTpsForTableScan(), w.batchSize) + } + retChk.Append(chk, 0, chk.NumRows()) + } } w.batchSize *= 2 if w.batchSize > w.maxBatchSize { @@ -653,7 +684,7 @@ func (w *partialTableWorker) extractTaskHandles(ctx context.Context, chk *chunk. return handles, retChk, nil } -func (w *partialTableWorker) buildTableTask(handles []kv.Handle, retChk *chunk.Chunk, parTblIdx int) *indexMergeTableTask { +func (w *partialTableWorker) buildTableTask(handles []kv.Handle, retChk *chunk.Chunk, parTblIdx int, partialPlanID int) *indexMergeTableTask { task := &indexMergeTableTask{ lookupTableTask: lookupTableTask{ handles: handles, @@ -661,7 +692,8 @@ func (w *partialTableWorker) buildTableTask(handles []kv.Handle, retChk *chunk.C partitionTable: w.partition, }, - parTblIdx: parTblIdx, + parTblIdx: parTblIdx, + partialPlanID: partialPlanID, } task.doneCh = make(chan error, 1) @@ -837,6 +869,192 @@ type indexMergeProcessWorker struct { stats *IndexMergeRuntimeStat } +type rowIdx struct { + partialID int + taskID int + rowID int +} + +type handleHeap struct { + requiredCnt uint64 + taskMap map[int][]*indexMergeTableTask + + idx []rowIdx + compareFunc []chunk.CompareFunc + byItems []*plannerutil.ByItems +} + +func (h handleHeap) Len() int { + return len(h.idx) +} + +func (h handleHeap) Less(i, j int) bool { + rowI := h.taskMap[h.idx[i].partialID][h.idx[i].taskID].idxRows.GetRow(h.idx[i].rowID) + rowJ := h.taskMap[h.idx[j].partialID][h.idx[j].taskID].idxRows.GetRow(h.idx[j].rowID) + + for i, compFunc := range h.compareFunc { + cmp := compFunc(rowI, i, rowJ, i) + if !h.byItems[i].Desc { + cmp = -cmp + } + if cmp < 0 { + return true + } else if cmp > 0 { + return false + } + } + return false +} + +func (h handleHeap) Swap(i, j int) { + h.idx[i], h.idx[j] = h.idx[j], h.idx[i] +} + +func (h *handleHeap) Push(x interface{}) { + idx := x.(rowIdx) + h.idx = append(h.idx, idx) +} + +func (h *handleHeap) Pop() interface{} { + idxRet := h.idx[len(h.idx)-1] + h.idx = h.idx[:len(h.idx)-1] + return idxRet +} + +func (w *indexMergeProcessWorker) NewHandleHeap(taskMap map[int][]*indexMergeTableTask) *handleHeap { + compareFuncs := make([]chunk.CompareFunc, 0, len(w.indexMerge.byItems)) + for _, item := range w.indexMerge.byItems { + keyType := item.Expr.GetType() + compareFuncs = append(compareFuncs, chunk.GetCompareFunc(keyType)) + } + requiredCnt := w.indexMerge.pushedLimit.Count + w.indexMerge.pushedLimit.Offset + return &handleHeap{ + requiredCnt: requiredCnt, + taskMap: taskMap, + idx: make([]rowIdx, 0, requiredCnt), + compareFunc: compareFuncs, + byItems: w.indexMerge.byItems, + } +} + +// pruneTableWorkerTaskIdxRows prune idxRows and only keep columns that will be used in byItems. +// e.g. the common handle is (`b`, `c`) and order by with column `c`, we should make column `c` at the first. +func (w *indexMergeProcessWorker) pruneTableWorkerTaskIdxRows(task *indexMergeTableTask) { + // IndexScan no need to prune retChk, Columns required by byItems are always first. + if plan, ok := w.indexMerge.partialPlans[task.partialPlanID][0].(*plannercore.PhysicalTableScan); ok { + prune := make([]int, 0, len(w.indexMerge.byItems)) + for _, item := range plan.ByItems { + c, _ := item.Expr.(*expression.Column) + idx := plan.Schema().ColumnIndex(c) + // couldn't equals to -1 here, if idx == -1, just let it panic + prune = append(prune, idx) + } + task.idxRows = task.idxRows.Prune(prune) + } +} + +func (w *indexMergeProcessWorker) fetchLoopUnionWithOrderByAndPushedLimit(ctx context.Context, fetchCh <-chan *indexMergeTableTask, + workCh chan<- *indexMergeTableTask, resultCh chan<- *indexMergeTableTask, finished <-chan struct{}) { + memTracker := memory.NewTracker(w.indexMerge.id, -1) + memTracker.AttachTo(w.indexMerge.memTracker) + defer memTracker.Detach() + defer close(workCh) + + if w.stats != nil { + start := time.Now() + defer func() { + w.stats.IndexMergeProcess += time.Since(start) + }() + } + + distinctHandles := kv.NewHandleMap() + taskMap := make(map[int][]*indexMergeTableTask) + uselessMap := make(map[int]struct{}) + taskHeap := w.NewHandleHeap(taskMap) + memTracker.Consume(int64(taskHeap.requiredCnt) * int64(unsafe.Sizeof(rowIdx{0, 0, 0}))) + + for task := range fetchCh { + if _, ok := uselessMap[task.partialPlanID]; ok { + continue + } + if _, ok := taskMap[task.partialPlanID]; !ok { + taskMap[task.partialPlanID] = make([]*indexMergeTableTask, 0) + } + w.pruneTableWorkerTaskIdxRows(task) + taskMap[task.partialPlanID] = append(taskMap[task.partialPlanID], task) + for i, h := range task.handles { + if _, ok := distinctHandles.Get(h); !ok { + distinctHandles.Set(h, true) + heap.Push(taskHeap, rowIdx{task.partialPlanID, len(taskMap[task.partialPlanID]) - 1, i}) + if taskHeap.Len() > int(taskHeap.requiredCnt) { + top := heap.Pop(taskHeap).(rowIdx) + if top.partialID == task.partialPlanID && top.taskID == len(taskMap[task.partialPlanID])-1 && top.rowID == i { + uselessMap[task.partialPlanID] = struct{}{} + task.handles = task.handles[:i] + break + } + } + } + memTracker.Consume(int64(h.MemUsage())) + } + memTracker.Consume(task.idxRows.MemoryUsage()) + if len(uselessMap) == len(w.indexMerge.partialPlans) { + // consume reset tasks + go func() { + for range fetchCh { + } + }() + break + } + } + + needCount := mathutil.Max(0, taskHeap.Len()-int(w.indexMerge.pushedLimit.Offset)) + if needCount == 0 { + return + } + fhs := make([]kv.Handle, needCount) + for i := needCount - 1; i >= 0; i-- { + idx := heap.Pop(taskHeap).(rowIdx) + fhs[i] = taskMap[idx.partialID][idx.taskID].handles[idx.rowID] + } + + batchSize := w.indexMerge.ctx.GetSessionVars().IndexLookupSize + tasks := make([]*indexMergeTableTask, 0, len(fhs)/batchSize+1) + for len(fhs) > 0 { + l := mathutil.Min(len(fhs), batchSize) + // Save the index order. + indexOrder := kv.NewHandleMap() + for i, h := range fhs[:l] { + indexOrder.Set(h, i) + } + tasks = append(tasks, &indexMergeTableTask{ + lookupTableTask: lookupTableTask{ + handles: fhs[:l], + indexOrder: indexOrder, + doneCh: make(chan error, 1), + }, + }) + fhs = fhs[l:] + } + for _, task := range tasks { + select { + case <-ctx.Done(): + return + case <-finished: + return + case workCh <- task: + select { + case <-ctx.Done(): + return + case <-finished: + return + case resultCh <- task: + continue + } + } + } +} + func (w *indexMergeProcessWorker) fetchLoopUnion(ctx context.Context, fetchCh <-chan *indexMergeTableTask, workCh chan<- *indexMergeTableTask, resultCh chan<- *indexMergeTableTask, finished <-chan struct{}) { failpoint.Inject("testIndexMergeResultChCloseEarly", func(_ failpoint.Value) { @@ -1160,6 +1378,7 @@ type partialIndexWorker struct { maxChunkSize int partition table.PhysicalTable // it indicates if this worker is accessing a particular partition table memTracker *memory.Tracker + byItems []*plannerutil.ByItems } func syncErr(ctx context.Context, finished <-chan struct{}, errCh chan<- *indexMergeTableTask, err error) { @@ -1190,8 +1409,9 @@ func (w *partialIndexWorker) fetchHandles( fetchCh chan<- *indexMergeTableTask, finished <-chan struct{}, handleCols plannercore.HandleCols, - parTblIdx int) (count int64, err error) { - chk := chunk.NewChunkWithCapacity(handleCols.GetFieldsTypes(), w.maxChunkSize) + parTblIdx int, + partialPlanIndex int) (count int64, err error) { + chk := chunk.NewChunkWithCapacity(w.getRetTpsForIndexScan(handleCols), w.maxChunkSize) for { start := time.Now() handles, retChunk, err := w.extractTaskHandles(ctx, chk, result, handleCols) @@ -1203,7 +1423,7 @@ func (w *partialIndexWorker) fetchHandles( return count, nil } count += int64(len(handles)) - task := w.buildTableTask(handles, retChunk, parTblIdx) + task := w.buildTableTask(handles, retChunk, parTblIdx, partialPlanIndex) if w.stats != nil { atomic.AddInt64(&w.stats.FetchIdxTime, int64(time.Since(start))) } @@ -1219,6 +1439,16 @@ func (w *partialIndexWorker) fetchHandles( } } +func (w *partialIndexWorker) getRetTpsForIndexScan(handleCols plannercore.HandleCols) []*types.FieldType { + var tps []*types.FieldType + if len(w.byItems) != 0 { + for _, item := range w.byItems { + tps = append(tps, item.Expr.GetType()) + } + } + return append(tps, handleCols.GetFieldsTypes()...) +} + func (w *partialIndexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult, handleCols plannercore.HandleCols) ( handles []kv.Handle, retChk *chunk.Chunk, err error) { handles = make([]kv.Handle, 0, w.batchSize) @@ -1250,6 +1480,13 @@ func (w *partialIndexWorker) extractTaskHandles(ctx context.Context, chk *chunk. } handles = append(handles, handle) } + // used for limit embedded. + if len(w.byItems) != 0 { + if retChk == nil { + retChk = chunk.NewChunkWithCapacity(w.getRetTpsForIndexScan(handleCols), w.batchSize) + } + retChk.Append(chk, 0, chk.NumRows()) + } } w.batchSize *= 2 if w.batchSize > w.maxBatchSize { @@ -1258,7 +1495,7 @@ func (w *partialIndexWorker) extractTaskHandles(ctx context.Context, chk *chunk. return handles, retChk, nil } -func (w *partialIndexWorker) buildTableTask(handles []kv.Handle, retChk *chunk.Chunk, parTblIdx int) *indexMergeTableTask { +func (w *partialIndexWorker) buildTableTask(handles []kv.Handle, retChk *chunk.Chunk, parTblIdx int, partialPlanID int) *indexMergeTableTask { task := &indexMergeTableTask{ lookupTableTask: lookupTableTask{ handles: handles, @@ -1266,7 +1503,8 @@ func (w *partialIndexWorker) buildTableTask(handles []kv.Handle, retChk *chunk.C partitionTable: w.partition, }, - parTblIdx: parTblIdx, + parTblIdx: parTblIdx, + partialPlanID: partialPlanID, } task.doneCh = make(chan error, 1) @@ -1385,6 +1623,19 @@ func (w *indexMergeTableScanWorker) executeTask(ctx context.Context, task *index } } + if w.indexMergeExec.keepOrder { + task.rowIdx = make([]int, 0, len(task.rows)) + for _, row := range task.rows { + handle, err := w.indexMergeExec.handleCols.BuildHandleFromIndexRow(row) + if err != nil { + return err + } + rowIdx, _ := task.indexOrder.Get(handle) + task.rowIdx = append(task.rowIdx, rowIdx.(int)) + } + sort.Sort(task) + } + memUsage = int64(cap(task.rows)) * int64(unsafe.Sizeof(chunk.Row{})) task.memUsage += memUsage task.memTracker.Consume(memUsage) diff --git a/executor/index_merge_reader_test.go b/executor/index_merge_reader_test.go index 46a1206460074..e6d8f93fbb3f9 100644 --- a/executor/index_merge_reader_test.go +++ b/executor/index_merge_reader_test.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/testkit/testutil" "github.com/pingcap/tidb/util" "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" ) func TestSingleTableRead(t *testing.T) { @@ -937,3 +938,105 @@ func TestIndexMergeCoprGoroutinesLeak(t *testing.T) { require.Contains(t, err.Error(), "testIndexMergePartialIndexWorkerCoprLeak") require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergePartialIndexWorkerCoprLeak")) } + +type valueStruct struct { + a int + b int + c int +} + +func getResult(values []*valueStruct, a int, b int, limit int, desc bool) []*valueStruct { + ret := make([]*valueStruct, 0) + for _, value := range values { + if value.a == a || value.b == b { + ret = append(ret, value) + } + } + slices.SortFunc(ret, func(a, b *valueStruct) bool { + if desc { + return a.c > b.c + } + return a.c < b.c + }) + if len(ret) > limit { + return ret[:limit] + } + return ret +} + +func TestOrderByWithLimit(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + tk.MustExec("create table thandle(a int, b int, c int, index idx_ac(a, c), index idx_bc(b, c))") + tk.MustExec("create table tpk(a int, b int, c int, d int auto_increment, primary key(d), index idx_ac(a, c), index idx_bc(b, c))") + tk.MustExec("create table tcommon(a int, b int, c int, d int auto_increment, primary key(a, c, d), index idx_ac(a, c), index idx_bc(b, c))") + tk.MustExec("create table thash(a int, b int, c int, index idx_ac(a, c), index idx_bc(b, c)) PARTITION BY HASH (`a`) PARTITIONS 4") + + valueSlice := make([]*valueStruct, 0, 2000) + for i := 0; i < 2000; i++ { + a := rand.Intn(32) + b := rand.Intn(32) + c := rand.Intn(32) + tk.MustExec(fmt.Sprintf("insert into thandle values (%v, %v, %v)", a, b, c)) + tk.MustExec(fmt.Sprintf("insert into tpk(a,b,c) values (%v, %v, %v)", a, b, c)) + tk.MustExec(fmt.Sprintf("insert into tcommon(a,b,c) values (%v, %v, %v)", a, b, c)) + tk.MustExec(fmt.Sprintf("insert into thash(a,b,c) values (%v, %v, %v)", a, b, c)) + valueSlice = append(valueSlice, &valueStruct{a, b, c}) + } + + tk.MustExec("analyze table thandle") + tk.MustExec("analyze table tpk") + tk.MustExec("analyze table tcommon") + tk.MustExec("analyze table thash") + + for i := 0; i < 100; i++ { + a := rand.Intn(32) + b := rand.Intn(32) + limit := rand.Intn(10) + 1 + queryHandle := fmt.Sprintf("select /*+ use_index_merge(thandle, idx_ac, idx_bc) */ * from thandle where a = %v or b = %v order by c limit %v", a, b, limit) + resHandle := tk.MustQuery(queryHandle).Rows() + require.True(t, tk.HasPlan(queryHandle, "IndexMerge")) + require.False(t, tk.HasPlan(queryHandle, "TopN")) + + queryPK := fmt.Sprintf("select /*+ use_index_merge(tpk, idx_ac, idx_bc) */ * from tpk where a = %v or b = %v order by c limit %v", a, b, limit) + resPK := tk.MustQuery(queryPK).Rows() + require.True(t, tk.HasPlan(queryPK, "IndexMerge")) + require.False(t, tk.HasPlan(queryPK, "TopN")) + + queryCommon := fmt.Sprintf("select /*+ use_index_merge(tcommon, idx_ac, idx_bc) */ * from tcommon where a = %v or b = %v order by c limit %v", a, b, limit) + resCommon := tk.MustQuery(queryCommon).Rows() + require.True(t, tk.HasPlan(queryCommon, "IndexMerge")) + require.False(t, tk.HasPlan(queryCommon, "TopN")) + + queryTableScan := fmt.Sprintf("select /*+ use_index_merge(tcommon, primary, idx_bc) */ * from tcommon where a = %v or b = %v order by c limit %v", a, b, limit) + resTableScan := tk.MustQuery(queryTableScan).Rows() + require.True(t, tk.HasPlan(queryTableScan, "IndexMerge")) + require.True(t, tk.HasPlan(queryTableScan, "TableRangeScan")) + require.False(t, tk.HasPlan(queryTableScan, "TopN")) + + queryHash := fmt.Sprintf("select /*+ use_index_merge(thash, idx_ac, idx_bc) */ * from thash where a = %v or b = %v order by c limit %v", a, b, limit) + resHash := tk.MustQuery(queryHash).Rows() + require.True(t, tk.HasPlan(queryHash, "IndexMerge")) + // not support partition table now. + require.True(t, tk.HasPlan(queryHash, "TopN")) + + sliceRes := getResult(valueSlice, a, b, limit, false) + + require.Equal(t, len(sliceRes), len(resHandle)) + require.Equal(t, len(sliceRes), len(resPK)) + require.Equal(t, len(sliceRes), len(resCommon)) + require.Equal(t, len(sliceRes), len(resTableScan)) + require.Equal(t, len(sliceRes), len(resHash)) + for i := range sliceRes { + expectValue := fmt.Sprintf("%v", sliceRes[i].c) + // Only check column `c` + require.Equal(t, expectValue, resHandle[i][2]) + require.Equal(t, expectValue, resPK[i][2]) + require.Equal(t, expectValue, resCommon[i][2]) + require.Equal(t, expectValue, resTableScan[i][2]) + require.Equal(t, expectValue, resHash[i][2]) + } + } +} diff --git a/planner/core/casetest/physical_plan_test.go b/planner/core/casetest/physical_plan_test.go index 3adea443c5a07..c0fa76d5a19b7 100644 --- a/planner/core/casetest/physical_plan_test.go +++ b/planner/core/casetest/physical_plan_test.go @@ -2464,6 +2464,7 @@ func TestIndexMergeOrderPushDown(t *testing.T) { tk.MustExec("use test") tk.MustExec("set tidb_cost_model_version=1") tk.MustExec("create table t (a int, b int, c int, index idx(a, c), index idx2(b, c))") + tk.MustExec("create table tcommon (a int, b int, c int, primary key(a, c), index idx2(b, c))") for i, ts := range input { testdata.OnRecord(func() { diff --git a/planner/core/casetest/testdata/plan_suite_in.json b/planner/core/casetest/testdata/plan_suite_in.json index 891b618fcf944..bc2f6867e0f4c 100644 --- a/planner/core/casetest/testdata/plan_suite_in.json +++ b/planner/core/casetest/testdata/plan_suite_in.json @@ -1302,7 +1302,9 @@ "select * from t where (a = 1 and c = 2) or b in (1, 2, 3) order by c limit 2", "select * from t where (a = 1 and c = 2) or (b in (1, 2, 3) and c = 3) order by c limit 2", "select * from t where (a = 1 or b = 2) and c = 3 order by c limit 2", - "select * from t where (a = 1 or b = 2) and c in (1, 2, 3) order by c limit 2" + "select * from t where (a = 1 or b = 2) and c in (1, 2, 3) order by c limit 2", + "select * from tcommon where a = 1 or b = 1 order by c limit 2", + "select * from tcommon where (a = 1 and c = 2) or (b = 1) order by c limit 2" ] } ] diff --git a/planner/core/casetest/testdata/plan_suite_out.json b/planner/core/casetest/testdata/plan_suite_out.json index f9f784a4d21ac..c7c43ec4a5686 100644 --- a/planner/core/casetest/testdata/plan_suite_out.json +++ b/planner/core/casetest/testdata/plan_suite_out.json @@ -8363,8 +8363,8 @@ { "SQL": "select * from t where a = 1 or b = 1 order by c limit 2", "Plan": [ - "TopN 2.00 root test.t.c, offset:0, count:2", - "└─IndexMerge 19.99 root type: union", + "Projection 2.00 root test.t.a, test.t.b, test.t.c", + "└─IndexMerge 19.99 root type: union, limit embedded(offset:0, count:2)", " ├─Limit(Build) 2.00 cop[tikv] offset:0, count:2", " │ └─IndexRangeScan 2.00 cop[tikv] table:t, index:idx(a, c) range:[1,1], keep order:true, stats:pseudo", " ├─Limit(Build) 2.00 cop[tikv] offset:0, count:2", @@ -8412,8 +8412,8 @@ { "SQL": "select * from t where (a = 1 and c = 2) or (b = 1) order by c limit 2", "Plan": [ - "TopN 2.00 root test.t.c, offset:0, count:2", - "└─IndexMerge 10.10 root type: union", + "Projection 2.00 root test.t.a, test.t.b, test.t.c", + "└─IndexMerge 10.10 root type: union, limit embedded(offset:0, count:2)", " ├─Limit(Build) 0.10 cop[tikv] offset:0, count:2", " │ └─IndexRangeScan 0.10 cop[tikv] table:t, index:idx(a, c) range:[1 2,1 2], keep order:true, stats:pseudo", " ├─Limit(Build) 2.00 cop[tikv] offset:0, count:2", @@ -8468,6 +8468,32 @@ " └─TableRowIDScan 19.99 cop[tikv] table:t keep order:false, stats:pseudo" ], "Warning": null + }, + { + "SQL": "select * from tcommon where a = 1 or b = 1 order by c limit 2", + "Plan": [ + "Projection 2.00 root test.tcommon.a, test.tcommon.b, test.tcommon.c", + "└─IndexMerge 19.99 root type: union, limit embedded(offset:0, count:2)", + " ├─Limit(Build) 2.00 cop[tikv] offset:0, count:2", + " │ └─TableRangeScan 2.00 cop[tikv] table:tcommon range:[1,1], keep order:true, stats:pseudo", + " ├─Limit(Build) 2.00 cop[tikv] offset:0, count:2", + " │ └─IndexRangeScan 2.00 cop[tikv] table:tcommon, index:idx2(b, c) range:[1,1], keep order:true, stats:pseudo", + " └─TableRowIDScan(Probe) 19.99 cop[tikv] table:tcommon keep order:false, stats:pseudo" + ], + "Warning": null + }, + { + "SQL": "select * from tcommon where (a = 1 and c = 2) or (b = 1) order by c limit 2", + "Plan": [ + "Projection 2.00 root test.tcommon.a, test.tcommon.b, test.tcommon.c", + "└─IndexMerge 11.00 root type: union, limit embedded(offset:0, count:2)", + " ├─Limit(Build) 1.00 cop[tikv] offset:0, count:2", + " │ └─TableRangeScan 1.00 cop[tikv] table:tcommon range:[1 2,1 2], keep order:true, stats:pseudo", + " ├─Limit(Build) 2.00 cop[tikv] offset:0, count:2", + " │ └─IndexRangeScan 2.00 cop[tikv] table:tcommon, index:idx2(b, c) range:[1,1], keep order:true, stats:pseudo", + " └─TableRowIDScan(Probe) 11.00 cop[tikv] table:tcommon keep order:false, stats:pseudo" + ], + "Warning": null } ] } diff --git a/planner/core/explain.go b/planner/core/explain.go index 72e65f2bbc432..78f5e7dbd07b7 100644 --- a/planner/core/explain.go +++ b/planner/core/explain.go @@ -306,10 +306,20 @@ func (p *PhysicalIndexLookUpReader) ExplainInfo() string { // ExplainInfo implements Plan interface. func (p *PhysicalIndexMergeReader) ExplainInfo() string { + var str strings.Builder if p.IsIntersectionType { - return "type: intersection" + str.WriteString("type: intersection") + } else { + str.WriteString("type: union") } - return "type: union" + if p.PushedLimit != nil { + str.WriteString(", limit embedded(offset:") + str.WriteString(strconv.FormatUint(p.PushedLimit.Offset, 10)) + str.WriteString(", count:") + str.WriteString(strconv.FormatUint(p.PushedLimit.Count, 10)) + str.WriteString(")") + } + return str.String() } // ExplainInfo implements Plan interface. diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index aa416e7a4b4f0..cbf8e2193d439 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -1244,7 +1244,11 @@ func overwritePartialTableScanSchema(ds *DataSource, ts *PhysicalTableScan) { for i := 0; i < hdColNum; i++ { col := handleCols.GetCol(i) exprCols = append(exprCols, col) - infoCols = append(infoCols, col.ToInfo()) + if c := model.FindColumnInfoByID(ds.TableInfo().Columns, col.ID); c != nil { + infoCols = append(infoCols, c) + } else { + infoCols = append(infoCols, col.ToInfo()) + } } ts.schema = expression.NewSchema(exprCols...) ts.Columns = infoCols @@ -2347,6 +2351,7 @@ func (ds *DataSource) getOriginalPhysicalTableScan(prop *property.PhysicalProper HandleCols: ds.handleCols, tblCols: ds.TblCols, tblColHists: ds.TblColHists, + constColsByCond: path.ConstCols, prop: prop, }.Init(ds.ctx, ds.blockOffset) ts.filterCondition = make([]expression.Expression, len(path.TableFilters)) diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index f171a9b32a59b..fd94cdca0d9b1 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -555,6 +555,11 @@ type PhysicalIndexMergeReader struct { // AccessMVIndex indicates whether this IndexMergeReader access a MVIndex. AccessMVIndex bool + // PushedLimit is used to avoid unnecessary table scan tasks of IndexMergeReader. + PushedLimit *PushedDownLimit + // ByItems is used to support sorting the handles returned by partialPlans. + ByItems []*util.ByItems + // PartialPlans flats the partialPlans to construct executor pb. PartialPlans [][]PhysicalPlan // TablePlans flats the tablePlan to construct executor pb. @@ -566,6 +571,8 @@ type PhysicalIndexMergeReader struct { // Used by partition table. PartitionInfo PartitionInfo + + KeepOrder bool } // GetAvgTableRowSize return the average row size of table plan. @@ -846,6 +853,11 @@ type PhysicalTableScan struct { tblCols []*expression.Column tblColHists *statistics.HistColl prop *property.PhysicalProperty + + // constColsByCond records the constant part of the index columns caused by the access conds. + // e.g. the index is (a, b, c) and there's filter a = 1 and b = 2, then the column a and b are const part. + // it's for indexMerge's tableScan only. + constColsByCond []bool } // Clone implements PhysicalPlan interface. diff --git a/planner/core/task.go b/planner/core/task.go index 3263fd31b7270..010e310e7cad5 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -1165,6 +1165,13 @@ func (p *PhysicalTopN) pushPartialTopNDownToCop(copTsk *copTask) (task, bool) { if partialSel != nil && finalScan.statsInfo().RowCount > 0 { selSelectivityOnPartialScan[i] = partialSel.statsInfo().RowCount / finalScan.statsInfo().RowCount } + // TODO: Support partition table later. + if plan, ok := finalScan.(*PhysicalTableScan); ok && tblInfo.GetPartitionInfo() == nil { + plan.ByItems = p.ByItems + } + if plan, ok := finalScan.(*PhysicalIndexScan); ok && tblInfo.GetPartitionInfo() == nil { + plan.ByItems = p.ByItems + } partialScans = append(partialScans, finalScan) } } @@ -1202,8 +1209,42 @@ func (p *PhysicalTopN) pushPartialTopNDownToCop(copTsk *copTask) (task, bool) { return nil, false } clonedTblScan.statsInfo().ScaleByExpectCnt(float64(p.Count+p.Offset) * float64(len(copTsk.idxMergePartPlans))) + // TODO: This is a hack way, planner should not prune handle columns when `keepOrder` = true. + // TODO: Support partition table later. + if tblInfo.GetPartitionInfo() == nil { + if tblInfo.PKIsHandle { + pk := tblInfo.GetPkColInfo() + col := expression.ColInfo2Col(tblScan.tblCols, pk) + tblScan.HandleCols = NewIntHandleCols(col) + clonedTblScan.Schema().Append(col) + clonedTblScan.(*PhysicalTableScan).Columns = append(clonedTblScan.(*PhysicalTableScan).Columns, pk) + } else if tblInfo.IsCommonHandle { + idxInfo := tblInfo.GetPrimaryKey() + tblScan.HandleCols = NewCommonHandleCols(p.SCtx().GetSessionVars().StmtCtx, tblInfo, idxInfo, tblScan.tblCols) + for i := 0; i < tblScan.HandleCols.NumCols(); i++ { + clonedTblScan.Schema().Append(tblScan.HandleCols.GetCol(i)) + clonedTblScan.(*PhysicalTableScan).Columns = append(clonedTblScan.(*PhysicalTableScan).Columns, tblScan.HandleCols.GetCol(i).ToInfo()) + } + } else { + clonedTblScan.Schema().Append(tblScan.HandleCols.GetCol(0)) + clonedTblScan.(*PhysicalTableScan).Columns = append(clonedTblScan.(*PhysicalTableScan).Columns, model.NewExtraHandleColInfo()) + } + } copTsk.tablePlan = clonedTblScan copTsk.indexPlanFinished = true + rootTask := copTsk.convertToRootTask(p.ctx) + // TODO: support order prop push down into partition table + if tblInfo.GetPartitionInfo() == nil { + if indexMerge, ok := rootTask.p.(*PhysicalIndexMergeReader); ok { + indexMerge.PushedLimit = &PushedDownLimit{ + Offset: p.Offset, + Count: p.Count, + } + indexMerge.ByItems = p.ByItems + indexMerge.KeepOrder = true + return rootTask, true + } + } } else { // The normal index scan cases.(single read and double read) propMatched := p.checkOrderPropForSubIndexScan(idxScan.IdxCols, idxScan.IdxColLens, idxScan.constColsByCond, colsProp) @@ -1241,6 +1282,7 @@ func (p *PhysicalTopN) pushPartialTopNDownToCop(copTsk *copTask) (task, bool) { Count: p.Count, } extraInfo, extraCol, hasExtraCol := tryGetPkExtraColumn(p.ctx.GetSessionVars(), tblInfo) + // TODO: sometimes it will add a duplicate `_tidb_rowid` column in ts.schema() if hasExtraCol { idxLookup.ExtraHandleCol = extraCol ts := idxLookup.TablePlans[0].(*PhysicalTableScan) @@ -1366,7 +1408,7 @@ func (p *PhysicalTopN) checkSubScans(colsProp *property.PhysicalProperty, isDesc } } else { idxCols, idxColLens := expression.IndexInfo2PrefixCols(x.Columns, x.Schema().Columns, tables.FindPrimaryIndex(x.Table)) - matched := p.checkOrderPropForSubIndexScan(idxCols, idxColLens, nil, colsProp) + matched := p.checkOrderPropForSubIndexScan(idxCols, idxColLens, x.constColsByCond, colsProp) if !matched { return false }