diff --git a/pkg/distsql/request_builder.go b/pkg/distsql/request_builder.go index 29d192c913a3b..31828dedd3da5 100644 --- a/pkg/distsql/request_builder.go +++ b/pkg/distsql/request_builder.go @@ -19,6 +19,7 @@ import ( "math" "sort" "sync/atomic" + "unsafe" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" @@ -740,7 +741,10 @@ func indexRangesToKVWithoutSplit(sc *stmtctx.StatementContext, tids []int64, idx krs[i] = make([]kv.KeyRange, 0, len(ranges)) } - const checkSignalStep = 8 + if memTracker != nil { + memTracker.Consume(int64(unsafe.Sizeof(kv.KeyRange{})) * int64(len(ranges))) + } + const checkSignalStep = 64 var estimatedMemUsage int64 // encodeIndexKey and EncodeIndexSeekKey is time-consuming, thus we need to // check the interrupt signal periodically. @@ -768,6 +772,11 @@ func indexRangesToKVWithoutSplit(sc *stmtctx.StatementContext, tids []int64, idx if interruptSignal != nil && interruptSignal.Load().(bool) { return kv.NewPartitionedKeyRanges(nil), nil } + if memTracker != nil { + // We use the Tracker.Consume function to check the memory usage of the current SQL. + // If the memory exceeds the quota, kill the SQL. + memTracker.Consume(1) + } } } return kv.NewPartitionedKeyRanges(krs), nil diff --git a/pkg/executor/distsql.go b/pkg/executor/distsql.go index 8619ade9e3566..e6a1127a4b2c1 100644 --- a/pkg/executor/distsql.go +++ b/pkg/executor/distsql.go @@ -514,6 +514,14 @@ func (e *IndexLookUpExecutor) Open(ctx context.Context) error { return err } } + + if e.memTracker != nil { + e.memTracker.Reset() + } else { + e.memTracker = memory.NewTracker(e.ID(), -1) + } + e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) + err = e.buildTableKeyRanges() if err != nil { return err @@ -544,7 +552,7 @@ func (e *IndexLookUpExecutor) buildTableKeyRanges() (err error) { if e.index.ID == -1 { kvRange, err = distsql.CommonHandleRangesToKVRanges(sc, []int64{physicalID}, ranges) } else { - kvRange, err = distsql.IndexRangesToKVRanges(sc, physicalID, e.index.ID, ranges) + kvRange, err = distsql.IndexRangesToKVRangesWithInterruptSignal(sc, physicalID, e.index.ID, ranges, e.memTracker, nil) } if err != nil { return err @@ -557,7 +565,7 @@ func (e *IndexLookUpExecutor) buildTableKeyRanges() (err error) { if e.index.ID == -1 { kvRanges, err = distsql.CommonHandleRangesToKVRanges(sc, []int64{physicalID}, e.ranges) } else { - kvRanges, err = distsql.IndexRangesToKVRanges(sc, physicalID, e.index.ID, e.ranges) + kvRanges, err = distsql.IndexRangesToKVRangesWithInterruptSignal(sc, physicalID, e.index.ID, e.ranges, e.memTracker, nil) } e.kvRanges = kvRanges.FirstPartitionRange() } diff --git a/pkg/executor/executor_pkg_test.go b/pkg/executor/executor_pkg_test.go index c22bc039f5053..7f00b7785e7b8 100644 --- a/pkg/executor/executor_pkg_test.go +++ b/pkg/executor/executor_pkg_test.go @@ -135,7 +135,7 @@ func TestBuildKvRangesForIndexJoinWithoutCwcAndWithMemoryTracker(t *testing.T) { } require.Equal(t, 2*bytesConsumed1, bytesConsumed2) - require.Equal(t, int64(20760), bytesConsumed1) + require.Equal(t, int64(25570), bytesConsumed1) } func generateIndexRange(vals ...int64) *ranger.Range {