From d84391c67b6577ab832bc97579b468870d9e6a72 Mon Sep 17 00:00:00 2001 From: djshow832 Date: Thu, 3 Mar 2022 12:49:46 +0800 Subject: [PATCH 1/2] cherry pick #32757 to release-5.2 Signed-off-by: ti-srebot --- executor/slow_query.go | 22 ++++++++++----- executor/slow_query_test.go | 54 +++++++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 7 deletions(-) diff --git a/executor/slow_query.go b/executor/slow_query.go index 7817d4b2ab51d..de77835fba5f5 100755 --- a/executor/slow_query.go +++ b/executor/slow_query.go @@ -24,7 +24,6 @@ import ( "sort" "strconv" "strings" - "sync" "sync/atomic" "time" @@ -411,7 +410,7 @@ func decomposeToSlowLogTasks(logs []slowLogBlock, num int) [][]string { func (e *slowQueryRetriever) parseSlowLog(ctx context.Context, sctx sessionctx.Context, reader *bufio.Reader, logNum int) { defer close(e.taskList) - var wg sync.WaitGroup + var wg util.WaitGroupWrapper offset := offset{offset: 0, length: 0} // To limit the num of go routine concurrent := sctx.GetSessionVars().Concurrency.DistSQLScanConcurrency() @@ -441,20 +440,29 @@ func (e *slowQueryRetriever) parseSlowLog(ctx context.Context, sctx sessionctx.C if e.stats != nil { e.stats.readFile += time.Since(startTime) } + failpoint.Inject("mockReadSlowLogSlow", func(val failpoint.Value) { + if val.(bool) { + signals := ctx.Value("signals").([]chan int) + signals[0] <- 1 + <-signals[1] + } + }) for i := range logs { log := logs[i] t := slowLogTask{} t.resultCh = make(chan parsedSlowLog, 1) start := offset - wg.Add(1) ch <- 1 - e.taskList <- t - go func() { - defer wg.Done() + select { + case <-ctx.Done(): + return + case e.taskList <- t: + } + wg.Run(func() { result, err := e.parseLog(ctx, sctx, log, start) e.sendParsedSlowLogCh(ctx, t, parsedSlowLog{result, err}) <-ch - }() + }) offset.offset = e.fileLine offset.length = 0 select { diff --git a/executor/slow_query_test.go b/executor/slow_query_test.go index 28861b4e67a5d..263b4290b48e7 100644 --- a/executor/slow_query_test.go +++ b/executor/slow_query_test.go @@ -18,6 +18,7 @@ import ( "bytes" "context" "os" + "runtime/pprof" "strings" "time" @@ -604,7 +605,60 @@ select 9;` } } +<<<<<<< HEAD func prepareLogs(c *C, logData []string, fileNames []string) { +======= +func TestCancelParseSlowLog(t *testing.T) { + fileName := "tidb-slow-2020-02-14T19-04-05.01.log" + slowLog := `# Time: 2019-04-28T15:24:04.309074+08:00 +select * from t;` + prepareLogs(t, []string{slowLog}, []string{fileName}) + defer func() { + removeFiles([]string{fileName}) + }() + sctx := mock.NewContext() + sctx.GetSessionVars().SlowQueryFile = fileName + + retriever, err := newSlowQueryRetriever() + require.NoError(t, err) + var signal1, signal2 = make(chan int, 1), make(chan int, 1) + ctx := context.WithValue(context.Background(), "signals", []chan int{signal1, signal2}) + ctx, cancel := context.WithCancel(ctx) + err = failpoint.Enable("github.com/pingcap/tidb/executor/mockReadSlowLogSlow", "return(true)") + require.NoError(t, err) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockReadSlowLogSlow")) + }() + go func() { + _, err := retriever.retrieve(ctx, sctx) + require.Errorf(t, err, "context canceled") + }() + // Wait for parseSlowLog going to add tasks. + <-signal1 + // Cancel the retriever and then dataForSlowLog exits. + cancel() + // Assume that there are already unprocessed tasks. + retriever.taskList <- slowLogTask{} + // Let parseSlowLog continue. + signal2 <- 1 + // parseSlowLog should exit immediately. + time.Sleep(1 * time.Second) + require.False(t, checkGoroutineExists("parseSlowLog")) +} + +func checkGoroutineExists(keyword string) bool { + buf := new(bytes.Buffer) + profile := pprof.Lookup("goroutine") + err := profile.WriteTo(buf, 1) + if err != nil { + panic(err) + } + str := buf.String() + return strings.Contains(str, keyword) +} + +func prepareLogs(t *testing.T, logData []string, fileNames []string) { +>>>>>>> 62ec7468e... executor: fix goroutine leak in querying slow log (#32757) writeFile := func(file string, data string) { f, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) c.Assert(err, IsNil) From 6429d86eec06559cf4248e7ef98d35794b8cce04 Mon Sep 17 00:00:00 2001 From: djshow832 <873581766@qq.com> Date: Fri, 15 Apr 2022 14:52:16 +0800 Subject: [PATCH 2/2] fix conflicts --- executor/slow_query.go | 9 ++++++--- executor/slow_query_test.go | 20 ++++++++------------ 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/executor/slow_query.go b/executor/slow_query.go index de77835fba5f5..48c75e996fe1c 100755 --- a/executor/slow_query.go +++ b/executor/slow_query.go @@ -24,6 +24,7 @@ import ( "sort" "strconv" "strings" + "sync" "sync/atomic" "time" @@ -410,7 +411,7 @@ func decomposeToSlowLogTasks(logs []slowLogBlock, num int) [][]string { func (e *slowQueryRetriever) parseSlowLog(ctx context.Context, sctx sessionctx.Context, reader *bufio.Reader, logNum int) { defer close(e.taskList) - var wg util.WaitGroupWrapper + var wg sync.WaitGroup offset := offset{offset: 0, length: 0} // To limit the num of go routine concurrent := sctx.GetSessionVars().Concurrency.DistSQLScanConcurrency() @@ -458,11 +459,13 @@ func (e *slowQueryRetriever) parseSlowLog(ctx context.Context, sctx sessionctx.C return case e.taskList <- t: } - wg.Run(func() { + wg.Add(1) + go func() { + defer wg.Done() result, err := e.parseLog(ctx, sctx, log, start) e.sendParsedSlowLogCh(ctx, t, parsedSlowLog{result, err}) <-ch - }) + }() offset.offset = e.fileLine offset.length = 0 select { diff --git a/executor/slow_query_test.go b/executor/slow_query_test.go index 263b4290b48e7..a41a6669a5d40 100644 --- a/executor/slow_query_test.go +++ b/executor/slow_query_test.go @@ -605,14 +605,11 @@ select 9;` } } -<<<<<<< HEAD -func prepareLogs(c *C, logData []string, fileNames []string) { -======= -func TestCancelParseSlowLog(t *testing.T) { +func (s *testExecSuite) TestCancelParseSlowLog(c *C) { fileName := "tidb-slow-2020-02-14T19-04-05.01.log" slowLog := `# Time: 2019-04-28T15:24:04.309074+08:00 select * from t;` - prepareLogs(t, []string{slowLog}, []string{fileName}) + prepareLogs(c, []string{slowLog}, []string{fileName}) defer func() { removeFiles([]string{fileName}) }() @@ -620,18 +617,18 @@ select * from t;` sctx.GetSessionVars().SlowQueryFile = fileName retriever, err := newSlowQueryRetriever() - require.NoError(t, err) + c.Assert(err, IsNil) var signal1, signal2 = make(chan int, 1), make(chan int, 1) ctx := context.WithValue(context.Background(), "signals", []chan int{signal1, signal2}) ctx, cancel := context.WithCancel(ctx) err = failpoint.Enable("github.com/pingcap/tidb/executor/mockReadSlowLogSlow", "return(true)") - require.NoError(t, err) + c.Assert(err, IsNil) defer func() { - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockReadSlowLogSlow")) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/mockReadSlowLogSlow"), IsNil) }() go func() { _, err := retriever.retrieve(ctx, sctx) - require.Errorf(t, err, "context canceled") + c.Assert(err, ErrorMatches, ".*context canceled.*") }() // Wait for parseSlowLog going to add tasks. <-signal1 @@ -643,7 +640,7 @@ select * from t;` signal2 <- 1 // parseSlowLog should exit immediately. time.Sleep(1 * time.Second) - require.False(t, checkGoroutineExists("parseSlowLog")) + c.Assert(checkGoroutineExists("parseSlowLog"), IsFalse) } func checkGoroutineExists(keyword string) bool { @@ -657,8 +654,7 @@ func checkGoroutineExists(keyword string) bool { return strings.Contains(str, keyword) } -func prepareLogs(t *testing.T, logData []string, fileNames []string) { ->>>>>>> 62ec7468e... executor: fix goroutine leak in querying slow log (#32757) +func prepareLogs(c *C, logData []string, fileNames []string) { writeFile := func(file string, data string) { f, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) c.Assert(err, IsNil)