Skip to content

Commit

Permalink
executor: fix goroutine leak in querying slow log (#32757) (#32780)
Browse files Browse the repository at this point in the history
close #32656
  • Loading branch information
ti-srebot authored Apr 15, 2022
1 parent d085522 commit 79db6f5
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 2 deletions.
15 changes: 13 additions & 2 deletions executor/slow_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,14 +441,25 @@ func (e *slowQueryRetriever) parseSlowLog(ctx context.Context, sctx sessionctx.C
if e.stats != nil {
e.stats.readFile += time.Since(startTime)
}
failpoint.Inject("mockReadSlowLogSlow", func(val failpoint.Value) {
if val.(bool) {
signals := ctx.Value("signals").([]chan int)
signals[0] <- 1
<-signals[1]
}
})
for i := range logs {
log := logs[i]
t := slowLogTask{}
t.resultCh = make(chan parsedSlowLog, 1)
start := offset
wg.Add(1)
ch <- 1
e.taskList <- t
select {
case <-ctx.Done():
return
case e.taskList <- t:
}
wg.Add(1)
go func() {
defer wg.Done()
result, err := e.parseLog(ctx, sctx, log, start)
Expand Down
50 changes: 50 additions & 0 deletions executor/slow_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"os"
"runtime/pprof"
"strings"
"time"

Expand Down Expand Up @@ -604,6 +605,55 @@ select 9;`
}
}

func (s *testExecSuite) TestCancelParseSlowLog(c *C) {
fileName := "tidb-slow-2020-02-14T19-04-05.01.log"
slowLog := `# Time: 2019-04-28T15:24:04.309074+08:00
select * from t;`
prepareLogs(c, []string{slowLog}, []string{fileName})
defer func() {
removeFiles([]string{fileName})
}()
sctx := mock.NewContext()
sctx.GetSessionVars().SlowQueryFile = fileName

retriever, err := newSlowQueryRetriever()
c.Assert(err, IsNil)
var signal1, signal2 = make(chan int, 1), make(chan int, 1)
ctx := context.WithValue(context.Background(), "signals", []chan int{signal1, signal2})
ctx, cancel := context.WithCancel(ctx)
err = failpoint.Enable("github.com/pingcap/tidb/executor/mockReadSlowLogSlow", "return(true)")
c.Assert(err, IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/mockReadSlowLogSlow"), IsNil)
}()
go func() {
_, err := retriever.retrieve(ctx, sctx)
c.Assert(err, ErrorMatches, ".*context canceled.*")
}()
// Wait for parseSlowLog going to add tasks.
<-signal1
// Cancel the retriever and then dataForSlowLog exits.
cancel()
// Assume that there are already unprocessed tasks.
retriever.taskList <- slowLogTask{}
// Let parseSlowLog continue.
signal2 <- 1
// parseSlowLog should exit immediately.
time.Sleep(1 * time.Second)
c.Assert(checkGoroutineExists("parseSlowLog"), IsFalse)
}

func checkGoroutineExists(keyword string) bool {
buf := new(bytes.Buffer)
profile := pprof.Lookup("goroutine")
err := profile.WriteTo(buf, 1)
if err != nil {
panic(err)
}
str := buf.String()
return strings.Contains(str, keyword)
}

func prepareLogs(c *C, logData []string, fileNames []string) {
writeFile := func(file string, data string) {
f, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
Expand Down

0 comments on commit 79db6f5

Please sign in to comment.