From 489edfc26ddcb8b3f3908cc50f5c81b62ee25a63 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Sat, 25 Apr 2020 00:27:45 +0800 Subject: [PATCH] executor: add concurrency limit on the union executor If we do not limit the concurrency on the union executor, 'select * from t limit 1000' could make TiDB OOM on a large partition table. --- executor/executor.go | 55 ++++++++++++++++++++++++++++++++++++--- executor/executor_test.go | 12 +++++++++ 2 files changed, 64 insertions(+), 3 deletions(-) diff --git a/executor/executor.go b/executor/executor.go index 23643de042f21..5d77d2d6f593f 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1461,19 +1461,66 @@ func (e *UnionExec) Open(ctx context.Context) error { return nil } +var sleepDur = []time.Duration{ + time.Microsecond, + 50 * time.Microsecond, + 100 * time.Microsecond, + 500 * time.Microsecond, + time.Millisecond, + 2 * time.Millisecond, + 3 * time.Millisecond, + 4 * time.Millisecond, + 5 * time.Millisecond, + 6 * time.Millisecond, +} + +// generateToken generates tokens to control: +// 1. How many workers in total can be spawn. +// 2. How long to wait before spawning a new worker. +func generateToken(tokenCh chan<- struct{}, exitCh <-chan struct{}) { + t := time.NewTimer(sleepDur[0]) + defer t.Stop() + // Generate 10 tokens at most, so the worker count is limited. + for _, duration := range sleepDur { + tokenCh <- struct{}{} + select { + case <-t.C: + t.Reset(duration) + case <-exitCh: + return + } + } +} + func (e *UnionExec) initialize(ctx context.Context) { e.resultPool = make(chan *unionWorkerResult, len(e.children)) e.resourcePools = make([]chan *chunk.Chunk, len(e.children)) + + tokenCh := make(chan struct{}, len(sleepDur)) + go generateToken(tokenCh, e.finished) + for i := range e.children { e.resourcePools[i] = make(chan *chunk.Chunk, 1) e.resourcePools[i] <- e.childrenResults[i] - e.wg.Add(1) - go e.resultPuller(ctx, i) } + e.wg.Add(len(e.children)) + go func() { + for i := range e.children { + select { + // If we do not limit the goroutine count here, 'select * from t limit 1000' on a + // very large partitioned table makes TiDB OOM. Because before the limit take + // effect, the background table reader has already fetch too much data. + case <-tokenCh: + go e.resultPuller(ctx, i, tokenCh) + case <-e.finished: + e.wg.Done() + } + } + }() go e.waitAllFinished() } -func (e *UnionExec) resultPuller(ctx context.Context, childID int) { +func (e *UnionExec) resultPuller(ctx context.Context, childID int, tokenCh chan struct{}) { result := &unionWorkerResult{ err: nil, chk: nil, @@ -1490,6 +1537,8 @@ func (e *UnionExec) resultPuller(ctx context.Context, childID int) { e.stopFetchData.Store(true) } e.wg.Done() + // Release the token so we can spawn a new resultPuller. + tokenCh <- struct{}{} }() for { if e.stopFetchData.Load().(bool) { diff --git a/executor/executor_test.go b/executor/executor_test.go index efcea63d0766d..384b6a2b439d2 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -1340,6 +1340,18 @@ func (s *testSuiteP2) TestUnion(c *C) { tk.MustQuery("select count(distinct a), sum(distinct a), avg(distinct a) from (select a from t union all select b from t) tmp;").Check(testkit.Rows("1 1.000 1.0000000")) } +func (s *testSuiteP2) TestUnionLimit(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists union_limit") + tk.MustExec("create table union_limit (id int) partition by hash(id) partitions 30") + for i := 0; i < 60; i++ { + tk.MustExec(fmt.Sprintf("insert into union_limit values (%d)", i)) + } + // Cover the code for worker count limit in the union executor. + tk.MustQuery("select * from union_limit limit 10") +} + func (s *testSuiteP1) TestNeighbouringProj(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test")