diff --git a/ddl/ddl.go b/ddl/ddl.go index 923d5d3217d7a..7ea6969353547 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -24,6 +24,7 @@ import ( "flag" "fmt" "sync" + "sync/atomic" "time" "github.com/google/uuid" @@ -756,7 +757,8 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error { return errors.Trace(err) } - ctx.GetSessionVars().StmtCtx.IsDDLJobInQueue = true + sessVars := ctx.GetSessionVars() + sessVars.StmtCtx.IsDDLJobInQueue = true // Notice worker that we push a new job and wait the job done. d.asyncNotifyWorker(job) @@ -798,6 +800,27 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error { return context.Canceled } + // If the connection being killed, we need to CANCEL the DDL job. + if atomic.LoadUint32(&sessVars.Killed) == 1 { + if sessVars.StmtCtx.DDLJobID != 0 { + sessVars.StmtCtx.DDLJobID = 0 // Avoid repeat. + + err := kv.RunInNewTxn(context.Background(), d.store, true, func(ctx context.Context, txn kv.Transaction) error { + // errs is the error per job, there is only one submitted + // err is the error of the overall task + errs, err := CancelJobs(txn, []int64{jobID}) + if len(errs) > 0 { + logutil.BgLogger().Warn("error canceling DDL job", zap.Error(errs[0])) + } + return err + }) + if err != nil { + logutil.BgLogger().Warn("Kill command could not cancel DDL job", zap.Error(err)) + continue + } + } + } + historyJob, err = d.getHistoryDDLJob(jobID) if err != nil { logutil.BgLogger().Error("[ddl] get history DDL job failed, check again", zap.Error(err)) diff --git a/server/server.go b/server/server.go index dde54399a7cd9..cd9638e8adcce 100644 --- a/server/server.go +++ b/server/server.go @@ -49,7 +49,6 @@ import ( "github.com/blacktear23/go-proxyprotocol" "github.com/pingcap/errors" "github.com/pingcap/tidb/config" - "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/kv" @@ -719,24 +718,6 @@ func killConn(conn *clientConn) { cancelFunc := conn.mu.cancelFunc conn.mu.RUnlock() - // If the connection being killed is a DDL Job, - // we need to CANCEL the matching jobID first. - if sessVars.StmtCtx.IsDDLJobInQueue { - jobID := sessVars.StmtCtx.DDLJobID - err := kv.RunInNewTxn(context.Background(), conn.ctx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error { - // errs is the error per job, there is only one submitted - // err is the error of the overall task - errs, err := ddl.CancelJobs(txn, []int64{jobID}) - if len(errs) > 0 { - logutil.BgLogger().Warn("error canceling DDL job", zap.Error(errs[0])) - } - return err - }) - if err != nil { - logutil.BgLogger().Warn("could not cancel DDL job", zap.Error(err)) - } - } - if cancelFunc != nil { cancelFunc() } diff --git a/util/expensivequery/expensivequery.go b/util/expensivequery/expensivequery.go index 035e90aec0190..c2dab7582fa5c 100644 --- a/util/expensivequery/expensivequery.go +++ b/util/expensivequery/expensivequery.go @@ -65,6 +65,7 @@ func (eqh *Handle) Run() { if len(info.Info) == 0 { continue } + costTime := time.Since(info.Time) if !info.ExceedExpensiveTimeThresh && costTime >= time.Second*time.Duration(threshold) && log.GetLevel() <= zapcore.WarnLevel { logExpensiveQuery(costTime, info)