diff --git a/distsql/BUILD.bazel b/distsql/BUILD.bazel index cb9bbc63dd524..31c1f3e8ad95e 100644 --- a/distsql/BUILD.bazel +++ b/distsql/BUILD.bazel @@ -13,11 +13,13 @@ go_library( "//config", "//ddl/placement", "//errno", + "//expression", "//infoschema", "//kv", "//metrics", "//parser/mysql", "//parser/terror", + "//planner/util", "//sessionctx", "//sessionctx/stmtctx", "//sessionctx/variable", diff --git a/distsql/select_result.go b/distsql/select_result.go index b26b83f2817e0..52922b0b029b5 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -16,6 +16,7 @@ package distsql import ( "bytes" + "container/heap" "context" "fmt" "strconv" @@ -26,9 +27,11 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/errno" + "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/terror" + "github.com/pingcap/tidb/planner/util" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/store/copr" @@ -62,6 +65,7 @@ var ( var ( _ SelectResult = (*selectResult)(nil) _ SelectResult = (*serialSelectResults)(nil) + _ SelectResult = (*sortedSelectResults)(nil) ) // SelectResult is an iterator of coprocessor partial results. @@ -74,6 +78,160 @@ type SelectResult interface { Close() error } +type chunkRowHeap struct { + *sortedSelectResults +} + +func (h chunkRowHeap) Len() int { + return len(h.rowPtrs) +} + +func (h chunkRowHeap) Less(i, j int) bool { + iPtr := h.rowPtrs[i] + jPtr := h.rowPtrs[j] + return h.lessRow(h.cachedChunks[iPtr.ChkIdx].GetRow(int(iPtr.RowIdx)), + h.cachedChunks[jPtr.ChkIdx].GetRow(int(jPtr.RowIdx))) +} + +func (h chunkRowHeap) Swap(i, j int) { + h.rowPtrs[i], h.rowPtrs[j] = h.rowPtrs[j], h.rowPtrs[i] +} + +func (h *chunkRowHeap) Push(x interface{}) { + h.rowPtrs = append(h.rowPtrs, x.(chunk.RowPtr)) +} + +func (h *chunkRowHeap) Pop() interface{} { + ret := h.rowPtrs[len(h.rowPtrs)-1] + h.rowPtrs = h.rowPtrs[0 : len(h.rowPtrs)-1] + return ret +} + +// NewSortedSelectResults is only for partition table +func NewSortedSelectResults(selectResult []SelectResult, byitems []*util.ByItems, memTracker *memory.Tracker) SelectResult { + s := &sortedSelectResults{ + selectResult: selectResult, + byItems: byitems, + memTracker: memTracker, + } + s.initCompareFuncs() + s.buildKeyColumns() + + s.heap = &chunkRowHeap{s} + s.cachedChunks = make([]*chunk.Chunk, len(selectResult)) + return s +} + +type sortedSelectResults struct { + selectResult []SelectResult + compareFuncs []chunk.CompareFunc + byItems []*util.ByItems + keyColumns []int + + cachedChunks []*chunk.Chunk + rowPtrs []chunk.RowPtr + heap *chunkRowHeap + + memTracker *memory.Tracker +} + +func (ssr *sortedSelectResults) updateCachedChunk(ctx context.Context, idx uint32) error { + prevMemUsage := ssr.cachedChunks[idx].MemoryUsage() + if err := ssr.selectResult[idx].Next(ctx, ssr.cachedChunks[idx]); err != nil { + return err + } + ssr.memTracker.Consume(ssr.cachedChunks[idx].MemoryUsage() - prevMemUsage) + if ssr.cachedChunks[idx].NumRows() == 0 { + return nil + } + heap.Push(ssr.heap, chunk.RowPtr{ChkIdx: idx, RowIdx: 0}) + return nil +} + +func (ssr *sortedSelectResults) initCompareFuncs() { + ssr.compareFuncs = make([]chunk.CompareFunc, len(ssr.byItems)) + for i, item := range ssr.byItems { + keyType := item.Expr.GetType() + ssr.compareFuncs[i] = chunk.GetCompareFunc(keyType) + } +} + +func (ssr *sortedSelectResults) buildKeyColumns() { + ssr.keyColumns = make([]int, 0, len(ssr.byItems)) + for _, by := range ssr.byItems { + col := by.Expr.(*expression.Column) + ssr.keyColumns = append(ssr.keyColumns, col.Index) + } +} + +func (ssr *sortedSelectResults) lessRow(rowI, rowJ chunk.Row) bool { + for i, colIdx := range ssr.keyColumns { + cmpFunc := ssr.compareFuncs[i] + cmp := cmpFunc(rowI, colIdx, rowJ, colIdx) + if ssr.byItems[i].Desc { + cmp = -cmp + } + if cmp < 0 { + return true + } else if cmp > 0 { + return false + } + } + return false +} + +func (*sortedSelectResults) NextRaw(context.Context) ([]byte, error) { + panic("Not support NextRaw for sortedSelectResults") +} + +func (ssr *sortedSelectResults) Next(ctx context.Context, c *chunk.Chunk) (err error) { + c.Reset() + for i := range ssr.cachedChunks { + if ssr.cachedChunks[i] == nil { + ssr.cachedChunks[i] = c.CopyConstruct() + ssr.memTracker.Consume(ssr.cachedChunks[i].MemoryUsage()) + } + } + + if ssr.heap.Len() == 0 { + for i := range ssr.cachedChunks { + if err = ssr.updateCachedChunk(ctx, uint32(i)); err != nil { + return err + } + } + } + + for c.NumRows() < c.RequiredRows() { + if ssr.heap.Len() == 0 { + break + } + + idx := heap.Pop(ssr.heap).(chunk.RowPtr) + c.AppendRow(ssr.cachedChunks[idx.ChkIdx].GetRow(int(idx.RowIdx))) + + if int(idx.RowIdx) >= ssr.cachedChunks[idx.ChkIdx].NumRows()-1 { + if err = ssr.updateCachedChunk(ctx, idx.ChkIdx); err != nil { + return err + } + } else { + heap.Push(ssr.heap, chunk.RowPtr{ChkIdx: idx.ChkIdx, RowIdx: idx.RowIdx + 1}) + } + } + return nil +} + +func (ssr *sortedSelectResults) Close() (err error) { + for i, sr := range ssr.selectResult { + err = sr.Close() + if err != nil { + return err + } + ssr.memTracker.Consume(-ssr.cachedChunks[i].MemoryUsage()) + ssr.cachedChunks[i] = nil + } + return nil +} + // NewSerialSelectResults create a SelectResult which will read each SelectResult serially. func NewSerialSelectResults(selectResults []SelectResult) SelectResult { return &serialSelectResults{ diff --git a/executor/builder.go b/executor/builder.go index 34f1822810179..f2cb3e91cef6f 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3359,6 +3359,7 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea table: tbl, keepOrder: ts.KeepOrder, desc: ts.Desc, + byItems: ts.ByItems, columns: ts.Columns, paging: paging, corColInFilter: b.corColInDistPlan(v.TablePlans), @@ -3668,6 +3669,7 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexRea keepOrder: is.KeepOrder, desc: is.Desc, columns: is.Columns, + byItems: is.ByItems, paging: paging, corColInFilter: b.corColInDistPlan(v.IndexPlans), corColInAccess: b.corColInAccess(v.IndexPlans[0]), diff --git a/executor/distsql.go b/executor/distsql.go index 743264e80546a..5460fc76825a7 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" plannercore "github.com/pingcap/tidb/planner/core" + plannerutil "github.com/pingcap/tidb/planner/util" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/statistics" @@ -193,6 +194,8 @@ type IndexReaderExecutor struct { keepOrder bool desc bool + // byItems only for partition table with orderBy + pushedLimit + byItems []*plannerutil.ByItems corColInFilter bool corColInAccess bool @@ -294,6 +297,25 @@ func (e *IndexReaderExecutor) Open(ctx context.Context) error { return e.open(ctx, kvRanges) } +func (e *IndexReaderExecutor) buildKVReq(ctx context.Context, r []kv.KeyRange) (*kv.Request, error) { + var builder distsql.RequestBuilder + builder.SetKeyRanges(r). + SetDAGRequest(e.dagPB). + SetStartTS(e.startTS). + SetDesc(e.desc). + SetKeepOrder(e.keepOrder). + SetTxnScope(e.txnScope). + SetReadReplicaScope(e.readReplicaScope). + SetIsStaleness(e.isStaleness). + SetFromSessionVars(e.ctx.GetSessionVars()). + SetFromInfoSchema(e.ctx.GetInfoSchema()). + SetMemTracker(e.memTracker). + SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.ctx, &builder.Request, e.netDataSize)). + SetConnID(e.ctx.GetSessionVars().ConnectionID) + kvReq, err := builder.Build() + return kvReq, err +} + func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) error { var err error if e.corColInFilter { @@ -324,29 +346,38 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) slices.SortFunc(kvRanges, func(i, j kv.KeyRange) bool { return bytes.Compare(i.StartKey, j.StartKey) < 0 }) - var builder distsql.RequestBuilder - builder.SetKeyRanges(kvRanges). - SetDAGRequest(e.dagPB). - SetStartTS(e.startTS). - SetDesc(e.desc). - SetKeepOrder(e.keepOrder). - SetTxnScope(e.txnScope). - SetReadReplicaScope(e.readReplicaScope). - SetIsStaleness(e.isStaleness). - SetFromSessionVars(e.ctx.GetSessionVars()). - SetFromInfoSchema(e.ctx.GetInfoSchema()). - SetMemTracker(e.memTracker). - SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.ctx, &builder.Request, e.netDataSize)). - SetConnID(e.ctx.GetSessionVars().ConnectionID) - kvReq, err := builder.Build() - if err != nil { - e.feedback.Invalidate() - return err - } - e.result, err = e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id) - if err != nil { - e.feedback.Invalidate() - return err + // use sortedSelectResults only when byItems pushed down and partition numbers > 1 + if e.byItems == nil || len(e.partitions) <= 1 { + kvReq, err := e.buildKVReq(ctx, kvRanges) + if err != nil { + e.feedback.Invalidate() + return err + } + e.result, err = e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id) + if err != nil { + e.feedback.Invalidate() + return err + } + } else { + kvReqs := make([]*kv.Request, 0, len(kvRanges)) + for _, kvRange := range kvRanges { + kvReq, err := e.buildKVReq(ctx, []kv.KeyRange{kvRange}) + if err != nil { + e.feedback.Invalidate() + return err + } + kvReqs = append(kvReqs, kvReq) + } + var results []distsql.SelectResult + for _, kvReq := range kvReqs { + result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id) + if err != nil { + e.feedback.Invalidate() + return err + } + results = append(results, result) + } + e.result = distsql.NewSortedSelectResults(results, e.byItems, e.memTracker) } return nil } diff --git a/executor/distsqltest/BUILD.bazel b/executor/distsqltest/BUILD.bazel index 41bfde36d383a..72908a9279fcc 100644 --- a/executor/distsqltest/BUILD.bazel +++ b/executor/distsqltest/BUILD.bazel @@ -13,6 +13,7 @@ go_test( "//config", "//kv", "//meta/autoid", + "//sessionctx/variable", "//testkit", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//tikv", diff --git a/executor/distsqltest/distsql_test.go b/executor/distsqltest/distsql_test.go index 59017e055bf41..6d634b12f6a7f 100644 --- a/executor/distsqltest/distsql_test.go +++ b/executor/distsqltest/distsql_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/testkit" "github.com/stretchr/testify/require" ) @@ -61,18 +62,16 @@ func TestDistsqlPartitionTableConcurrency(t *testing.T) { // 20-ranges-partitioned table checker ctx3 := context.WithValue(context.Background(), "CheckSelectRequestHook", func(req *kv.Request) { require.Equal(t, req.KeyRanges.PartitionNum(), 20) - require.Equal(t, req.Concurrency, 15) + require.Equal(t, req.Concurrency, variable.DefDistSQLScanConcurrency) }) ctxs := []context.Context{ctx1, ctx2, ctx3} for i, tbl := range []string{"t1", "t2", "t3"} { ctx := ctxs[i] - tk.MustQueryWithContext(ctx, fmt.Sprintf("select * from %s order by id asc limit 1", tbl)). - Check(testkit.Rows("0 0")) - tk.MustQueryWithContext(ctx, fmt.Sprintf("select * from %s order by id asc limit 5", tbl)). - Check(testkit.Rows("0 0", "50 50", "100 100", "150 150", "200 200")) - tk.MustQueryWithContext(ctx, fmt.Sprintf("select * from %s order by id desc limit 1", tbl)). - Check(testkit.Rows("950 950")) - tk.MustQueryWithContext(ctx, fmt.Sprintf("select * from %s order by id desc limit 5", tbl)). - Check(testkit.Rows("950 950", "900 900", "850 850", "800 800", "750 750")) + // If order by is added here, the concurrency is always equal to 1. + // Because we will use different kv.Request for each partition in TableReader. + tk.MustQueryWithContext(ctx, fmt.Sprintf("select * from %s limit 1", tbl)) + tk.MustQueryWithContext(ctx, fmt.Sprintf("select * from %s limit 5", tbl)) + tk.MustQueryWithContext(ctx, fmt.Sprintf("select * from %s limit 1", tbl)) + tk.MustQueryWithContext(ctx, fmt.Sprintf("select * from %s limit 5", tbl)) } } diff --git a/executor/partition_table.go b/executor/partition_table.go index 714cdc206b95e..37a62627ba6cf 100644 --- a/executor/partition_table.go +++ b/executor/partition_table.go @@ -23,6 +23,9 @@ import ( ) func updateExecutorTableID(ctx context.Context, exec *tipb.Executor, recursive bool, partitionIDs []int64) error { + if exec == nil { + return nil + } var child *tipb.Executor switch exec.Tp { case tipb.ExecType_TypeTableScan: diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index 6595bfb5b1b37..094e113c01e37 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -530,7 +530,7 @@ func TestOrderByAndLimit(t *testing.T) { queryPartition := fmt.Sprintf("select * from trange use index(idx_a) where a > %v order by a, b limit %v;", x, y) queryRegular := fmt.Sprintf("select * from tregular use index(idx_a) where a > %v order by a, b limit %v;", x, y) require.True(t, tk.HasPlan(queryPartition, "IndexLookUp")) // check if IndexLookUp is used - tk.MustQuery(queryPartition).Sort().Check(tk.MustQuery(queryRegular).Sort().Rows()) + tk.MustQuery(queryPartition).Check(tk.MustQuery(queryRegular).Rows()) } // test indexLookUp with order property pushed down. @@ -544,7 +544,7 @@ func TestOrderByAndLimit(t *testing.T) { maxEle := tk.MustQuery(fmt.Sprintf("select ifnull(max(a), 1100) from (select * from tregular use index(idx_a) where a > %v order by a limit %v) t", x, y)).Rows()[0][0] queryRangePartitionWithLimitHint := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from trange use index(idx_a) where a > %v and a < greatest(%v+1, %v) order by a limit %v", x, x+1, maxEle, y) queryHashPartitionWithLimitHint := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from thash use index(idx_a) where a > %v and a < greatest(%v+1, %v) order by a limit %v", x, x+1, maxEle, y) - queryListPartitionWithLimitHint := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from thash use index(idx_a) where a > %v and a < greatest(%v+1, %v) order by a limit %v", x, x+1, maxEle, y) + queryListPartitionWithLimitHint := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from tlist use index(idx_a) where a > %v and a < greatest(%v+1, %v) order by a limit %v", x, x+1, maxEle, y) queryRegular := fmt.Sprintf("select * from tregular use index(idx_a) where a > %v and a < greatest(%v+1, %v) order by a limit %v;", x, x+1, maxEle, y) require.True(t, tk.HasPlan(queryRangePartitionWithLimitHint, "Limit")) require.True(t, tk.HasPlan(queryRangePartitionWithLimitHint, "IndexLookUp")) @@ -570,7 +570,7 @@ func TestOrderByAndLimit(t *testing.T) { queryPartition := fmt.Sprintf("select * from trange ignore index(idx_a) where a > %v order by a, b limit %v;", x, y) queryRegular := fmt.Sprintf("select * from tregular ignore index(idx_a) where a > %v order by a, b limit %v;", x, y) require.True(t, tk.HasPlan(queryPartition, "TableReader")) // check if tableReader is used - tk.MustQuery(queryPartition).Sort().Check(tk.MustQuery(queryRegular).Sort().Rows()) + tk.MustQuery(queryPartition).Check(tk.MustQuery(queryRegular).Rows()) } // test tableReader with order property pushed down. @@ -589,10 +589,10 @@ func TestOrderByAndLimit(t *testing.T) { require.False(t, tk.HasPlan(queryRangePartition, "Limit")) // check if order property is not pushed require.False(t, tk.HasPlan(queryHashPartition, "Limit")) require.False(t, tk.HasPlan(queryListPartition, "Limit")) - regularResult := tk.MustQuery(queryRegular).Sort().Rows() - tk.MustQuery(queryRangePartition).Sort().Check(regularResult) - tk.MustQuery(queryHashPartition).Sort().Check(regularResult) - tk.MustQuery(queryListPartition).Sort().Check(regularResult) + regularResult := tk.MustQuery(queryRegular).Rows() + tk.MustQuery(queryRangePartition).Check(regularResult) + tk.MustQuery(queryHashPartition).Check(regularResult) + tk.MustQuery(queryListPartition).Check(regularResult) // test int pk // To be simplified, we only read column a. @@ -622,9 +622,9 @@ func TestOrderByAndLimit(t *testing.T) { require.True(t, tk.HasPlan(queryRangePartition, "Limit")) // check if order property is pushed require.True(t, tk.HasPlan(queryHashPartition, "Limit")) require.True(t, tk.HasPlan(queryListPartition, "Limit")) - require.True(t, tk.HasPlan(queryRangePartition, "TopN")) // but not fully pushed - require.True(t, tk.HasPlan(queryHashPartition, "TopN")) - require.True(t, tk.HasPlan(queryListPartition, "TopN")) + require.False(t, tk.HasPlan(queryRangePartition, "TopN")) // could fully pushed for TableScan executor + require.False(t, tk.HasPlan(queryHashPartition, "TopN")) + require.False(t, tk.HasPlan(queryListPartition, "TopN")) regularResult = tk.MustQuery(queryRegular).Rows() tk.MustQuery(queryRangePartition).Check(regularResult) tk.MustQuery(queryHashPartition).Check(regularResult) @@ -695,7 +695,7 @@ func TestOrderByAndLimit(t *testing.T) { queryPartition := fmt.Sprintf("select a from trange use index(idx_a) where a > %v order by a limit %v;", x, y) queryRegular := fmt.Sprintf("select a from tregular use index(idx_a) where a > %v order by a limit %v;", x, y) require.True(t, tk.HasPlan(queryPartition, "IndexReader")) // check if indexReader is used - tk.MustQuery(queryPartition).Sort().Check(tk.MustQuery(queryRegular).Sort().Rows()) + tk.MustQuery(queryPartition).Check(tk.MustQuery(queryRegular).Rows()) } // test indexReader with order property pushed down. @@ -705,27 +705,40 @@ func TestOrderByAndLimit(t *testing.T) { x := rand.Intn(1099) y := rand.Intn(2000) + 1 queryRangePartition := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ a from trange use index(idx_a) where a > %v order by a limit %v;", x, y) - queryHashPartition := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ a from trange use index(idx_a) where a > %v order by a limit %v;", x, y) + queryHashPartition := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ a from thash use index(idx_a) where a > %v order by a limit %v;", x, y) queryRegular := fmt.Sprintf("select a from tregular use index(idx_a) where a > %v order by a limit %v;", x, y) require.True(t, tk.HasPlan(queryRangePartition, "IndexReader")) // check if indexReader is used require.True(t, tk.HasPlan(queryHashPartition, "IndexReader")) require.True(t, tk.HasPlan(queryRangePartition, "Limit")) // check if order property is pushed require.True(t, tk.HasPlan(queryHashPartition, "Limit")) - regularResult := tk.MustQuery(queryRegular).Sort().Rows() - tk.MustQuery(queryRangePartition).Sort().Check(regularResult) - tk.MustQuery(queryHashPartition).Sort().Check(regularResult) + require.False(t, tk.HasPlan(queryRangePartition, "TopN")) // fully pushed limit + require.False(t, tk.HasPlan(queryHashPartition, "TopN")) + regularResult := tk.MustQuery(queryRegular).Rows() + tk.MustQuery(queryRangePartition).Check(regularResult) + tk.MustQuery(queryHashPartition).Check(regularResult) } // test indexMerge for i := 0; i < 100; i++ { - // explain select /*+ use_index_merge(t) */ * from t where a > 2 or b < 5 order by a limit {x}; // check if IndexMerge is used - // select /*+ use_index_merge(t) */ * from t where a > 2 or b < 5 order by a limit {x}; // can return the correct value + // explain select /*+ use_index_merge(t) */ * from t where a > 2 or b < 5 order by a, b limit {x}; // check if IndexMerge is used + // select /*+ use_index_merge(t) */ * from t where a > 2 or b < 5 order by a, b limit {x}; // can return the correct value y := rand.Intn(2000) + 1 - queryPartition := fmt.Sprintf("select /*+ use_index_merge(thash) */ * from thash where a > 2 or b < 5 order by a, b limit %v;", y) + queryHashPartition := fmt.Sprintf("select /*+ use_index_merge(thash) */ * from thash where a > 2 or b < 5 order by a, b limit %v;", y) queryRegular := fmt.Sprintf("select * from tregular where a > 2 or b < 5 order by a, b limit %v;", y) - require.True(t, tk.HasPlan(queryPartition, "IndexMerge")) // check if indexMerge is used - tk.MustQuery(queryPartition).Sort().Check(tk.MustQuery(queryRegular).Sort().Rows()) - } + require.True(t, tk.HasPlan(queryHashPartition, "IndexMerge")) // check if indexMerge is used + tk.MustQuery(queryHashPartition).Check(tk.MustQuery(queryRegular).Rows()) + } + + // test sql killed when memory exceed `tidb_mem_quota_query` + originMemQuota := tk.MustQuery("show variables like 'tidb_mem_quota_query'").Rows()[0][1].(string) + originOOMAction := tk.MustQuery("show variables like 'tidb_mem_oom_action'").Rows()[0][1].(string) + tk.MustExec("set session tidb_mem_quota_query=128") + tk.MustExec("set global tidb_mem_oom_action=CANCEL") + err := tk.QueryToErr("select /*+ LIMIT_TO_COP() */ a from trange use index(idx_a) where a > 1 order by a limit 2000") + require.Error(t, err) + require.Regexp(t, "Out Of Memory Quota.*", err) + tk.MustExec(fmt.Sprintf("set session tidb_mem_quota_query=%s", originMemQuota)) + tk.MustExec(fmt.Sprintf("set global tidb_mem_oom_action=%s", originOOMAction)) } func TestOrderByOnUnsignedPk(t *testing.T) { diff --git a/executor/table_reader.go b/executor/table_reader.go index 6d2cbb4224b6d..7385b5f8500c1 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" plannercore "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/planner/util" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessiontxn" "github.com/pingcap/tidb/statistics" @@ -105,6 +106,8 @@ type TableReaderExecutor struct { keepOrder bool desc bool + // byItems only for partition table with orderBy + pushedLimit + byItems []*util.ByItems paging bool storeType kv.StoreType // corColInFilter tells whether there's correlated column in filter. @@ -185,6 +188,7 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error { e.feedback.Invalidate() } } + firstPartRanges, secondPartRanges := distsql.SplitRangesAcrossInt64Boundary(e.ranges, e.keepOrder, e.desc, e.table.Meta() != nil && e.table.Meta().IsCommonHandle) // Treat temporary table as dummy table, avoid sending distsql request to TiKV. @@ -311,6 +315,23 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra return result, nil } + // use sortedSelectResults here when pushDown limit for partition table. + if e.kvRangeBuilder != nil && e.byItems != nil { + kvReqs, err := e.buildKVReqSeparately(ctx, ranges) + if err != nil { + return nil, err + } + var results []distsql.SelectResult + for _, kvReq := range kvReqs { + result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id) + if err != nil { + return nil, err + } + results = append(results, result) + } + return distsql.NewSortedSelectResults(results, e.byItems, e.memTracker), nil + } + kvReq, err := e.buildKVReq(ctx, ranges) if err != nil { return nil, err diff --git a/planner/core/casetest/testdata/integration_partition_suite_out.json b/planner/core/casetest/testdata/integration_partition_suite_out.json index 9e9999cd9d4ba..2fbe7f417621f 100644 --- a/planner/core/casetest/testdata/integration_partition_suite_out.json +++ b/planner/core/casetest/testdata/integration_partition_suite_out.json @@ -1173,7 +1173,7 @@ { "SQL": "explain format='brief' select a from trange use index (ia) where a > 10 order by a limit 10", "Plan": [ - "TopN 10.00 root test.trange.a, offset:0, count:10", + "Limit 10.00 root offset:0, count:10", "└─IndexReader 10.00 root partition:all index:Limit", " └─Limit 10.00 cop[tikv] offset:0, count:10", " └─IndexRangeScan 10.00 cop[tikv] table:trange, index:ia(a) range:(10,+inf], keep order:false, stats:pseudo" @@ -1182,7 +1182,7 @@ { "SQL": "explain format='brief' select a from tlist use index (ia) where a > 10 order by a limit 10", "Plan": [ - "TopN 10.00 root test.tlist.a, offset:0, count:10", + "Limit 10.00 root offset:0, count:10", "└─IndexReader 10.00 root partition:all index:Limit", " └─Limit 10.00 cop[tikv] offset:0, count:10", " └─IndexRangeScan 10.00 cop[tikv] table:tlist, index:ia(a) range:(10,+inf], keep order:false, stats:pseudo" @@ -1191,7 +1191,7 @@ { "SQL": "explain format='brief' select a from thash use index (ia) where a > 10 order by a limit 10", "Plan": [ - "TopN 10.00 root test.thash.a, offset:0, count:10", + "Limit 10.00 root offset:0, count:10", "└─IndexReader 10.00 root partition:all index:Limit", " └─Limit 10.00 cop[tikv] offset:0, count:10", " └─IndexRangeScan 10.00 cop[tikv] table:thash, index:ia(a) range:(10,+inf], keep order:false, stats:pseudo" @@ -1341,7 +1341,7 @@ "SQL": "explain format='brief' select a from trange use index () where b > 10 order by b limit 10", "Plan": [ "Projection 10.00 root test.trange.a", - "└─TopN 10.00 root test.trange.b, offset:0, count:10", + "└─Limit 10.00 root offset:0, count:10", " └─TableReader 10.00 root partition:all data:Limit", " └─Limit 10.00 cop[tikv] offset:0, count:10", " └─TableRangeScan 10.00 cop[tikv] table:trange range:(10,+inf], keep order:true, stats:pseudo" @@ -1351,7 +1351,7 @@ "SQL": "explain format='brief' select a from tlist use index () where b > 10 order by b limit 10", "Plan": [ "Projection 10.00 root test.tlist.a", - "└─TopN 10.00 root test.tlist.b, offset:0, count:10", + "└─Limit 10.00 root offset:0, count:10", " └─TableReader 10.00 root partition:all data:Limit", " └─Limit 10.00 cop[tikv] offset:0, count:10", " └─TableRangeScan 10.00 cop[tikv] table:tlist range:(10,+inf], keep order:true, stats:pseudo" @@ -1361,7 +1361,7 @@ "SQL": "explain format='brief' select a from thash use index () where b > 10 order by b limit 10", "Plan": [ "Projection 10.00 root test.thash.a", - "└─TopN 10.00 root test.thash.b, offset:0, count:10", + "└─Limit 10.00 root offset:0, count:10", " └─TableReader 10.00 root partition:all data:Limit", " └─Limit 10.00 cop[tikv] offset:0, count:10", " └─TableRangeScan 10.00 cop[tikv] table:thash range:(10,+inf], keep order:true, stats:pseudo" @@ -1382,7 +1382,7 @@ "SQL": "explain format='brief' select a from trange use index () where a > 10 order by b limit 10", "Plan": [ "Projection 10.00 root test.trange.a", - "└─TopN 10.00 root test.trange.b, offset:0, count:10", + "└─Limit 10.00 root offset:0, count:10", " └─TableReader 10.00 root partition:all data:Limit", " └─Limit 10.00 cop[tikv] offset:0, count:10", " └─Selection 10.00 cop[tikv] gt(test.trange.a, 10)", @@ -1393,7 +1393,7 @@ "SQL": "explain format='brief' select a from tlist use index () where a > 10 order by b limit 10", "Plan": [ "Projection 10.00 root test.tlist.a", - "└─TopN 10.00 root test.tlist.b, offset:0, count:10", + "└─Limit 10.00 root offset:0, count:10", " └─TableReader 10.00 root partition:all data:Limit", " └─Limit 10.00 cop[tikv] offset:0, count:10", " └─Selection 10.00 cop[tikv] gt(test.tlist.a, 10)", @@ -1404,7 +1404,7 @@ "SQL": "explain format='brief' select a from thash use index () where a > 10 order by b limit 10", "Plan": [ "Projection 10.00 root test.thash.a", - "└─TopN 10.00 root test.thash.b, offset:0, count:10", + "└─Limit 10.00 root offset:0, count:10", " └─TableReader 10.00 root partition:all data:Limit", " └─Limit 10.00 cop[tikv] offset:0, count:10", " └─Selection 10.00 cop[tikv] gt(test.thash.a, 10)", diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index 315b47fb3dd22..fbf7eaf73f2b2 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -670,6 +670,9 @@ type PhysicalIndexScan struct { isPartition bool Desc bool KeepOrder bool + // ByItems only for partition table with orderBy + pushedLimit + ByItems []*util.ByItems + // DoubleRead means if the index executor will read kv two times. // If the query requires the columns that don't belong to index, DoubleRead will be true. DoubleRead bool @@ -825,6 +828,8 @@ type PhysicalTableScan struct { // KeepOrder is true, if sort data by scanning pkcol, KeepOrder bool Desc bool + // ByItems only for partition table with orderBy + pushedLimit + ByItems []*util.ByItems isChildOfIndexLookUp bool diff --git a/planner/core/task.go b/planner/core/task.go index 09a3254c2ed2f..163faa9b492f9 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -1211,6 +1211,7 @@ func (p *PhysicalTopN) pushPartialTopNDownToCop(copTsk *copTask) (task, bool) { return nil, false } idxScan.Desc = isDesc + idxScan.ByItems = p.ByItems childProfile := copTsk.plan().statsInfo() newCount := p.Offset + p.Count stats := deriveLimitStats(childProfile, float64(newCount)) @@ -1230,6 +1231,18 @@ func (p *PhysicalTopN) pushPartialTopNDownToCop(copTsk *copTask) (task, bool) { scaledRowCount := child.Stats().RowCount / selSelectivity idxScan.SetStats(idxScan.Stats().ScaleByExpectCnt(scaledRowCount)) } + + rootTask := copTsk.convertToRootTask(p.ctx) + // only support IndexReader now. + if _, ok := rootTask.p.(*PhysicalIndexReader); ok { + rootLimit := PhysicalLimit{ + Count: p.Count, + Offset: p.Offset, + PartitionBy: newPartitionBy, + }.Init(p.SCtx(), stats, p.SelectBlockOffset()) + rootLimit.SetSchema(rootTask.plan().Schema()) + return attachPlan2Task(rootLimit, rootTask), true + } } } else if copTsk.indexPlan == nil { if tblScan.HandleCols == nil { @@ -1251,6 +1264,7 @@ func (p *PhysicalTopN) pushPartialTopNDownToCop(copTsk *copTask) (task, bool) { tblScan.Desc = isDesc // SplitRangesAcrossInt64Boundary needs the KeepOrder flag. See that func and the struct tableResultHandler for more details. tblScan.KeepOrder = true + tblScan.ByItems = p.ByItems childProfile := copTsk.plan().statsInfo() newCount := p.Offset + p.Count stats := deriveLimitStats(childProfile, float64(newCount)) @@ -1271,6 +1285,15 @@ func (p *PhysicalTopN) pushPartialTopNDownToCop(copTsk *copTask) (task, bool) { scaledRowCount := child.Stats().RowCount / selSelectivity tblScan.SetStats(tblScan.Stats().ScaleByExpectCnt(scaledRowCount)) } + + rootTask := copTsk.convertToRootTask(p.ctx) + rootLimit := PhysicalLimit{ + Count: p.Count, + Offset: p.Offset, + PartitionBy: newPartitionBy, + }.Init(p.SCtx(), stats, p.SelectBlockOffset()) + rootLimit.SetSchema(rootTask.plan().Schema()) + return attachPlan2Task(rootLimit, rootTask), true } else { return nil, false }