Skip to content

Commit

Permalink
executor: fix goroutine leak in querying slow log (#32757)
Browse files Browse the repository at this point in the history
close #32656
  • Loading branch information
djshow832 authored Mar 3, 2022
1 parent 4f072fb commit 62ec746
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 7 deletions.
22 changes: 15 additions & 7 deletions executor/slow_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -413,7 +412,7 @@ func decomposeToSlowLogTasks(logs []slowLogBlock, num int) [][]string {

func (e *slowQueryRetriever) parseSlowLog(ctx context.Context, sctx sessionctx.Context, reader *bufio.Reader, logNum int) {
defer close(e.taskList)
var wg sync.WaitGroup
var wg util.WaitGroupWrapper
offset := offset{offset: 0, length: 0}
// To limit the num of go routine
concurrent := sctx.GetSessionVars().Concurrency.DistSQLScanConcurrency()
Expand Down Expand Up @@ -447,20 +446,29 @@ func (e *slowQueryRetriever) parseSlowLog(ctx context.Context, sctx sessionctx.C
if e.stats != nil {
e.stats.readFile += time.Since(startTime)
}
failpoint.Inject("mockReadSlowLogSlow", func(val failpoint.Value) {
if val.(bool) {
signals := ctx.Value("signals").([]chan int)
signals[0] <- 1
<-signals[1]
}
})
for i := range logs {
log := logs[i]
t := slowLogTask{}
t.resultCh = make(chan parsedSlowLog, 1)
start := offset
wg.Add(1)
ch <- 1
e.taskList <- t
go func() {
defer wg.Done()
select {
case <-ctx.Done():
return
case e.taskList <- t:
}
wg.Run(func() {
result, err := e.parseLog(ctx, sctx, log, start)
e.sendParsedSlowLogCh(ctx, t, parsedSlowLog{result, err})
<-ch
}()
})
offset.offset = e.fileLine
offset.length = 0
select {
Expand Down
50 changes: 50 additions & 0 deletions executor/slow_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"os"
"runtime/pprof"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -651,6 +652,55 @@ select 9;`
}
}

func TestCancelParseSlowLog(t *testing.T) {
fileName := "tidb-slow-2020-02-14T19-04-05.01.log"
slowLog := `# Time: 2019-04-28T15:24:04.309074+08:00
select * from t;`
prepareLogs(t, []string{slowLog}, []string{fileName})
defer func() {
removeFiles([]string{fileName})
}()
sctx := mock.NewContext()
sctx.GetSessionVars().SlowQueryFile = fileName

retriever, err := newSlowQueryRetriever()
require.NoError(t, err)
var signal1, signal2 = make(chan int, 1), make(chan int, 1)
ctx := context.WithValue(context.Background(), "signals", []chan int{signal1, signal2})
ctx, cancel := context.WithCancel(ctx)
err = failpoint.Enable("github.com/pingcap/tidb/executor/mockReadSlowLogSlow", "return(true)")
require.NoError(t, err)
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockReadSlowLogSlow"))
}()
go func() {
_, err := retriever.retrieve(ctx, sctx)
require.Errorf(t, err, "context canceled")
}()
// Wait for parseSlowLog going to add tasks.
<-signal1
// Cancel the retriever and then dataForSlowLog exits.
cancel()
// Assume that there are already unprocessed tasks.
retriever.taskList <- slowLogTask{}
// Let parseSlowLog continue.
signal2 <- 1
// parseSlowLog should exit immediately.
time.Sleep(1 * time.Second)
require.False(t, checkGoroutineExists("parseSlowLog"))
}

func checkGoroutineExists(keyword string) bool {
buf := new(bytes.Buffer)
profile := pprof.Lookup("goroutine")
err := profile.WriteTo(buf, 1)
if err != nil {
panic(err)
}
str := buf.String()
return strings.Contains(str, keyword)
}

func prepareLogs(t *testing.T, logData []string, fileNames []string) {
writeFile := func(file string, data string) {
f, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
Expand Down

0 comments on commit 62ec746

Please sign in to comment.