Skip to content

Commit

Permalink
*: fix a data race on TestConnExecutionTimeout
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao committed Jul 4, 2022
1 parent e50b943 commit 33f1684
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 20 deletions.
25 changes: 24 additions & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"fmt"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -755,7 +756,8 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {
return errors.Trace(err)
}

ctx.GetSessionVars().StmtCtx.IsDDLJobInQueue = true
sessVars := ctx.GetSessionVars()
sessVars.StmtCtx.IsDDLJobInQueue = true

// Notice worker that we push a new job and wait the job done.
d.asyncNotifyWorker(job)
Expand Down Expand Up @@ -797,6 +799,27 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {
return context.Canceled
}

// If the connection being killed, we need to CANCEL the DDL job.
if atomic.LoadUint32(&sessVars.Killed) == 1 {
if sessVars.StmtCtx.DDLJobID != 0 {
sessVars.StmtCtx.DDLJobID = 0 // Avoid repeat.

err := kv.RunInNewTxn(context.Background(), d.store, true, func(ctx context.Context, txn kv.Transaction) error {
// errs is the error per job, there is only one submitted
// err is the error of the overall task
errs, err := CancelJobs(txn, []int64{jobID})
if len(errs) > 0 {
logutil.BgLogger().Warn("error canceling DDL job", zap.Error(errs[0]))
}
return err
})
if err != nil {
logutil.BgLogger().Warn("Kill command could not cancel DDL job", zap.Error(err))
continue
}
}
}

historyJob, err = d.getHistoryDDLJob(jobID)
if err != nil {
logutil.BgLogger().Error("[ddl] get history DDL job failed, check again", zap.Error(err))
Expand Down
19 changes: 0 additions & 19 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ import (
"github.com/blacktear23/go-proxyprotocol"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -719,24 +718,6 @@ func killConn(conn *clientConn) {
cancelFunc := conn.mu.cancelFunc
conn.mu.RUnlock()

// If the connection being killed is a DDL Job,
// we need to CANCEL the matching jobID first.
if sessVars.StmtCtx.IsDDLJobInQueue {
jobID := sessVars.StmtCtx.DDLJobID
err := kv.RunInNewTxn(context.Background(), conn.ctx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error {
// errs is the error per job, there is only one submitted
// err is the error of the overall task
errs, err := ddl.CancelJobs(txn, []int64{jobID})
if len(errs) > 0 {
logutil.BgLogger().Warn("error canceling DDL job", zap.Error(errs[0]))
}
return err
})
if err != nil {
logutil.BgLogger().Warn("could not cancel DDL job", zap.Error(err))
}
}

if cancelFunc != nil {
cancelFunc()
}
Expand Down
6 changes: 6 additions & 0 deletions util/expensivequery/expensivequery.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ func (eqh *Handle) Run() {
if len(info.Info) == 0 {
continue
}

// Don't kill DDL jobs.
if info.StmtCtx.IsDDLJobInQueue {
continue
}

costTime := time.Since(info.Time)
if !info.ExceedExpensiveTimeThresh && costTime >= time.Second*time.Duration(threshold) && log.GetLevel() <= zapcore.WarnLevel {
logExpensiveQuery(costTime, info)
Expand Down

0 comments on commit 33f1684

Please sign in to comment.