diff --git a/executor/slow_query.go b/executor/slow_query.go index 7817d4b2ab51d..48c75e996fe1c 100755 --- a/executor/slow_query.go +++ b/executor/slow_query.go @@ -441,14 +441,25 @@ func (e *slowQueryRetriever) parseSlowLog(ctx context.Context, sctx sessionctx.C if e.stats != nil { e.stats.readFile += time.Since(startTime) } + failpoint.Inject("mockReadSlowLogSlow", func(val failpoint.Value) { + if val.(bool) { + signals := ctx.Value("signals").([]chan int) + signals[0] <- 1 + <-signals[1] + } + }) for i := range logs { log := logs[i] t := slowLogTask{} t.resultCh = make(chan parsedSlowLog, 1) start := offset - wg.Add(1) ch <- 1 - e.taskList <- t + select { + case <-ctx.Done(): + return + case e.taskList <- t: + } + wg.Add(1) go func() { defer wg.Done() result, err := e.parseLog(ctx, sctx, log, start) diff --git a/executor/slow_query_test.go b/executor/slow_query_test.go index 28861b4e67a5d..a41a6669a5d40 100644 --- a/executor/slow_query_test.go +++ b/executor/slow_query_test.go @@ -18,6 +18,7 @@ import ( "bytes" "context" "os" + "runtime/pprof" "strings" "time" @@ -604,6 +605,55 @@ select 9;` } } +func (s *testExecSuite) TestCancelParseSlowLog(c *C) { + fileName := "tidb-slow-2020-02-14T19-04-05.01.log" + slowLog := `# Time: 2019-04-28T15:24:04.309074+08:00 +select * from t;` + prepareLogs(c, []string{slowLog}, []string{fileName}) + defer func() { + removeFiles([]string{fileName}) + }() + sctx := mock.NewContext() + sctx.GetSessionVars().SlowQueryFile = fileName + + retriever, err := newSlowQueryRetriever() + c.Assert(err, IsNil) + var signal1, signal2 = make(chan int, 1), make(chan int, 1) + ctx := context.WithValue(context.Background(), "signals", []chan int{signal1, signal2}) + ctx, cancel := context.WithCancel(ctx) + err = failpoint.Enable("github.com/pingcap/tidb/executor/mockReadSlowLogSlow", "return(true)") + c.Assert(err, IsNil) + defer func() { + c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/mockReadSlowLogSlow"), IsNil) + }() + go func() { + _, err := retriever.retrieve(ctx, sctx) + c.Assert(err, ErrorMatches, ".*context canceled.*") + }() + // Wait for parseSlowLog going to add tasks. + <-signal1 + // Cancel the retriever and then dataForSlowLog exits. + cancel() + // Assume that there are already unprocessed tasks. + retriever.taskList <- slowLogTask{} + // Let parseSlowLog continue. + signal2 <- 1 + // parseSlowLog should exit immediately. + time.Sleep(1 * time.Second) + c.Assert(checkGoroutineExists("parseSlowLog"), IsFalse) +} + +func checkGoroutineExists(keyword string) bool { + buf := new(bytes.Buffer) + profile := pprof.Lookup("goroutine") + err := profile.WriteTo(buf, 1) + if err != nil { + panic(err) + } + str := buf.String() + return strings.Contains(str, keyword) +} + func prepareLogs(c *C, logData []string, fileNames []string) { writeFile := func(file string, data string) { f, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)