diff --git a/ddl/ddl.go b/ddl/ddl.go index 9a00e55e0afa7..e914026ff6f59 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -1039,6 +1039,10 @@ func setDDLJobQuery(ctx sessionctx.Context, job *model.Job) { // - context.Cancel: job has been sent to worker, but not found in history DDL job before cancel // - other: found in history DDL job and return that job error func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error { + job.TraceInfo = &model.TraceInfo{ + ConnectionID: ctx.GetSessionVars().ConnectionID, + SessionAlias: ctx.GetSessionVars().SessionAlias, + } if mci := ctx.GetSessionVars().StmtCtx.MultiSchemaInfo; mci != nil { // In multiple schema change, we don't run the job. // Instead, we merge all the jobs into one pending job. diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 2417500a0fe63..0775ebb0ed192 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -140,10 +140,21 @@ func newWorker(ctx context.Context, tp workerType, sessPool *sess.Pool, delRange delRangeManager: delRangeMgr, } worker.addingDDLJobKey = addingDDLJobPrefix + worker.typeStr() - worker.logCtx = logutil.WithKeyValue(context.Background(), "worker", worker.String()) + worker.logCtx = logutil.WithFields(context.Background(), zap.String("worker", worker.String()), zap.String("category", "ddl")) return worker } +func (w *worker) jobLogger(job *model.Job) *zap.Logger { + logger := logutil.Logger(w.logCtx) + if job != nil { + logger = logutil.LoggerWithTraceInfo( + logger.With(zap.Int64("jobID", job.ID)), + job.TraceInfo, + ) + } + return logger +} + func (w *worker) typeStr() string { var str string switch w.tp { @@ -167,7 +178,7 @@ func (w *worker) Close() { w.sessPool.Put(w.sess.Session()) } w.wg.Wait() - logutil.Logger(w.logCtx).Info("DDL worker closed", zap.String("category", "ddl"), zap.Duration("take time", time.Since(startTime))) + logutil.Logger(w.logCtx).Info("DDL worker closed", zap.Duration("take time", time.Since(startTime))) } func (dc *ddlCtx) asyncNotifyByEtcd(etcdPath string, jobID int64, jobType string) { @@ -416,7 +427,7 @@ func (w *worker) handleUpdateJobError(t *meta.Meta, job *model.Job, err error) e return nil } if kv.ErrEntryTooLarge.Equal(err) { - logutil.Logger(w.logCtx).Warn("update DDL job failed", zap.String("category", "ddl"), zap.String("job", job.String()), zap.Error(err)) + w.jobLogger(job).Warn("update DDL job failed", zap.String("job", job.String()), zap.Error(err)) w.sess.Rollback() err1 := w.sess.Begin() if err1 != nil { @@ -443,7 +454,7 @@ func (w *worker) updateDDLJob(job *model.Job, meetErr bool) error { }) updateRawArgs := needUpdateRawArgs(job, meetErr) if !updateRawArgs { - logutil.Logger(w.logCtx).Info("meet something wrong before update DDL job, shouldn't update raw args", zap.String("category", "ddl"), + w.jobLogger(job).Info("meet something wrong before update DDL job, shouldn't update raw args", zap.String("job", job.String())) } return errors.Trace(updateDDLJob2Table(w.sess, job, updateRawArgs)) @@ -601,7 +612,7 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) { } job.BinlogInfo.FinishedTS = t.StartTS - logutil.Logger(w.logCtx).Info("finish DDL job", zap.String("category", "ddl"), zap.String("job", job.String())) + w.jobLogger(job).Info("finish DDL job", zap.String("job", job.String())) updateRawArgs := true if job.Type == model.ActionAddPrimaryKey && !job.IsCancelled() { // ActionAddPrimaryKey needs to check the warnings information in job.Args. @@ -825,7 +836,7 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) { if !dbterror.ErrPausedDDLJob.Equal(runJobErr) { // wait a while to retry again. If we don't wait here, DDL will retry this job immediately, // which may act like a deadlock. - logutil.Logger(w.logCtx).Info("run DDL job failed, sleeps a while then retries it.", zap.String("category", "ddl"), + w.jobLogger(job).Info("run DDL job failed, sleeps a while then retries it.", zap.Duration("waitTime", GetWaitTimeWhenErrorOccurred()), zap.Error(runJobErr)) } @@ -889,8 +900,7 @@ func (w *worker) waitDependencyJobFinished(job *model.Job, cnt *int) { if job.DependencyID != noneDependencyJob { intervalCnt := int(3 * time.Second / waitDependencyJobInterval) if *cnt%intervalCnt == 0 { - logutil.Logger(w.logCtx).Info("DDL job need to wait dependent job, sleeps a while, then retries it.", zap.String("category", "ddl"), - zap.Int64("jobID", job.ID), + w.jobLogger(job).Info("DDL job need to wait dependent job, sleeps a while, then retries it.", zap.Int64("dependentJobID", job.DependencyID), zap.Duration("waitTime", waitDependencyJobInterval)) } @@ -918,15 +928,16 @@ func (w *worker) countForPanic(job *model.Job) { } job.ErrorCount++ + logger := w.jobLogger(job) // Load global DDL variables. if err1 := loadDDLVars(w); err1 != nil { - logutil.Logger(w.logCtx).Error("load DDL global variable failed", zap.String("category", "ddl"), zap.Error(err1)) + logger.Error("load DDL global variable failed", zap.Error(err1)) } errorCount := variable.GetDDLErrorCountLimit() if job.ErrorCount > errorCount { msg := fmt.Sprintf("panic in handling DDL logic and error count beyond the limitation %d, cancelled", errorCount) - logutil.Logger(w.logCtx).Warn(msg) + logger.Warn(msg) job.Error = toTError(errors.New(msg)) job.State = model.JobStateCancelled } @@ -937,20 +948,21 @@ func (w *worker) countForError(err error, job *model.Job) error { job.Error = toTError(err) job.ErrorCount++ + logger := w.jobLogger(job) // If job is cancelled, we shouldn't return an error and shouldn't load DDL variables. if job.State == model.JobStateCancelled { - logutil.Logger(w.logCtx).Info("DDL job is cancelled normally", zap.String("category", "ddl"), zap.Error(err)) + logger.Info("DDL job is cancelled normally", zap.Error(err)) return nil } - logutil.Logger(w.logCtx).Warn("run DDL job error", zap.String("category", "ddl"), zap.Error(err)) + logger.Warn("run DDL job error", zap.Error(err)) // Load global DDL variables. if err1 := loadDDLVars(w); err1 != nil { - logutil.Logger(w.logCtx).Error("load DDL global variable failed", zap.String("category", "ddl"), zap.Error(err1)) + logger.Error("load DDL global variable failed", zap.Error(err1)) } // Check error limit to avoid falling into an infinite loop. if job.ErrorCount > variable.GetDDLErrorCountLimit() && job.State == model.JobStateRunning && job.IsRollbackable() { - logutil.Logger(w.logCtx).Warn("DDL job error count exceed the limit, cancelling it now", zap.String("category", "ddl"), zap.Int64("jobID", job.ID), zap.Int64("errorCountLimit", variable.GetDDLErrorCountLimit())) + logger.Warn("DDL job error count exceed the limit, cancelling it now", zap.Int64("errorCountLimit", variable.GetDDLErrorCountLimit())) job.State = model.JobStateCancelling } return err @@ -958,11 +970,11 @@ func (w *worker) countForError(err error, job *model.Job) error { func (w *worker) processJobPausingRequest(d *ddlCtx, job *model.Job) (isRunnable bool, err error) { if job.IsPaused() { - logutil.Logger(w.logCtx).Debug("paused DDL job ", zap.String("category", "ddl"), zap.String("job", job.String())) + w.jobLogger(job).Debug("paused DDL job ", zap.String("job", job.String())) return false, err } if job.IsPausing() { - logutil.Logger(w.logCtx).Debug("pausing DDL job ", zap.String("category", "ddl"), zap.String("job", job.String())) + w.jobLogger(job).Debug("pausing DDL job ", zap.String("job", job.String())) job.State = model.JobStatePaused return false, pauseReorgWorkers(w, d, job) } @@ -980,7 +992,7 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, failpoint.Inject("mockPanicInRunDDLJob", func(val failpoint.Value) {}) if job.Type != model.ActionMultiSchemaChange { - logutil.Logger(w.logCtx).Info("run DDL job", zap.String("category", "ddl"), zap.String("job", job.String())) + w.jobLogger(job).Info("run DDL job", zap.String("category", "ddl"), zap.String("job", job.String())) } timeStart := time.Now() if job.RealStartTS == 0 { @@ -990,13 +1002,13 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerRunDDLJob, job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(timeStart).Seconds()) }() if job.IsFinished() { - logutil.Logger(w.logCtx).Debug("finish DDL job", zap.String("category", "ddl"), zap.String("job", job.String())) + w.jobLogger(job).Debug("finish DDL job", zap.String("category", "ddl"), zap.String("job", job.String())) return ver, err } // The cause of this job state is that the job is cancelled by client. if job.IsCancelling() { - logutil.Logger(w.logCtx).Debug("cancel DDL job", zap.String("category", "ddl"), zap.String("job", job.String())) + w.jobLogger(job).Debug("cancel DDL job", zap.String("job", job.String())) return convertJob2RollbackJob(w, d, t, job) } diff --git a/ddl/rollingback.go b/ddl/rollingback.go index f6390cae6a63f..47e1a3735d5cb 100644 --- a/ddl/rollingback.go +++ b/ddl/rollingback.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util/dbterror" - "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" ) @@ -124,7 +123,7 @@ func convertNotReorgAddIdxJob2RollbackJob(d *ddlCtx, t *meta.Meta, job *model.Jo func rollingbackModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) { if needNotifyAndStopReorgWorker(job) { // column type change workers are started. we have to ask them to exit. - logutil.Logger(w.logCtx).Info("run the cancelling DDL job", zap.String("category", "ddl"), zap.String("job", job.String())) + w.jobLogger(job).Info("run the cancelling DDL job", zap.String("job", job.String())) d.notifyReorgWorkerJobStateChange(job) // Give the this kind of ddl one more round to run, the dbterror.ErrCancelledDDLJob should be fetched from the bottom up. return w.onModifyColumn(d, t, job) @@ -240,7 +239,7 @@ func rollingbackDropIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, e func rollingbackAddIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, isPK bool) (ver int64, err error) { if needNotifyAndStopReorgWorker(job) { // add index workers are started. need to ask them to exit. - logutil.Logger(w.logCtx).Info("run the cancelling DDL job", zap.String("category", "ddl"), zap.String("job", job.String())) + w.jobLogger(job).Info("run the cancelling DDL job", zap.String("job", job.String())) d.notifyReorgWorkerJobStateChange(job) ver, err = w.onCreateIndex(d, t, job, isPK) } else { @@ -407,7 +406,7 @@ func rollingbackReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (ve func pauseReorgWorkers(w *worker, d *ddlCtx, job *model.Job) (err error) { if needNotifyAndStopReorgWorker(job) { - logutil.Logger(w.logCtx).Info("pausing the DDL job", zap.String("category", "ddl"), zap.String("job", job.String())) + w.jobLogger(job).Info("pausing the DDL job", zap.String("job", job.String())) d.notifyReorgWorkerJobStateChange(job) } @@ -466,6 +465,7 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) err = dbterror.ErrCancelledDDLJob } + logger := w.jobLogger(job) if err != nil { if job.Error == nil { job.Error = toTError(err) @@ -485,22 +485,22 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) // job state and args may not be correctly overwritten. The job will be fetched to run with the cancelling // state again. So we should check the error count here. if err1 := loadDDLVars(w); err1 != nil { - logutil.Logger(w.logCtx).Error("load DDL global variable failed", zap.String("category", "ddl"), zap.Error(err1)) + logger.Error("load DDL global variable failed", zap.Error(err1)) } errorCount := variable.GetDDLErrorCountLimit() if job.ErrorCount > errorCount { - logutil.Logger(w.logCtx).Warn("rollback DDL job error count exceed the limit, cancelled it now", zap.String("category", "ddl"), zap.Int64("jobID", job.ID), zap.Int64("errorCountLimit", errorCount)) + logger.Warn("rollback DDL job error count exceed the limit, cancelled it now", zap.Int64("errorCountLimit", errorCount)) job.Error = toTError(errors.Errorf("rollback DDL job error count exceed the limit %d, cancelled it now", errorCount)) job.State = model.JobStateCancelled } } if !(job.State != model.JobStateRollingback && job.State != model.JobStateCancelled) { - logutil.Logger(w.logCtx).Info("the DDL job is cancelled normally", zap.String("category", "ddl"), zap.String("job", job.String()), zap.Error(err)) + logger.Info("the DDL job is cancelled normally", zap.String("job", job.String()), zap.Error(err)) // If job is cancelled, we shouldn't return an error. return ver, nil } - logutil.Logger(w.logCtx).Error("run DDL job failed", zap.String("category", "ddl"), zap.String("job", job.String()), zap.Error(err)) + logger.Error("run DDL job failed", zap.String("job", job.String()), zap.Error(err)) } return diff --git a/parser/model/ddl.go b/parser/model/ddl.go index 3ddbbde13a183..71ec23d4272b7 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -352,6 +352,7 @@ func (sub *SubJob) ToProxyJob(parentJob *Job) Job { Charset: parentJob.Charset, Collate: parentJob.Collate, AdminOperator: parentJob.AdminOperator, + TraceInfo: parentJob.TraceInfo, } } @@ -441,6 +442,9 @@ type Job struct { // AdminOperator indicates where the Admin command comes, by the TiDB // itself (AdminCommandBySystem) or by user (AdminCommandByEndUser). AdminOperator AdminCommandOperator `json:"admin_operator"` + + // TraceInfo indicates the information for SQL tracing + TraceInfo *TraceInfo `json:"trace_info"` } // FinishTableJob is called when a job is finished. diff --git a/parser/model/ddl_test.go b/parser/model/ddl_test.go index 6f7d0a4bbe5e5..b0d53708cc851 100644 --- a/parser/model/ddl_test.go +++ b/parser/model/ddl_test.go @@ -50,7 +50,7 @@ func TestJobSize(t *testing.T) { - SubJob.ToProxyJob() ` job := model.Job{} - require.Equal(t, 328, int(unsafe.Sizeof(job)), msg) + require.Equal(t, 336, int(unsafe.Sizeof(job)), msg) } func TestBackfillMetaCodec(t *testing.T) { diff --git a/util/logutil/log.go b/util/logutil/log.go index b01ea86987fd9..06678c352e066 100644 --- a/util/logutil/log.go +++ b/util/logutil/log.go @@ -200,6 +200,19 @@ func BgLogger() *zap.Logger { return log.L() } +// LoggerWithTraceInfo attaches fields from trace info to logger +func LoggerWithTraceInfo(logger *zap.Logger, info *model.TraceInfo) *zap.Logger { + if logger == nil { + logger = log.L() + } + + if fields := fieldsFromTraceInfo(info); len(fields) > 0 { + logger = logger.With(fields...) + } + + return logger +} + // WithConnID attaches connId to context. func WithConnID(ctx context.Context, connID uint64) context.Context { return WithFields(ctx, zap.Uint64("conn", connID)) diff --git a/util/logutil/log_test.go b/util/logutil/log_test.go index 975c80d16200d..9177e1cf0b9dd 100644 --- a/util/logutil/log_test.go +++ b/util/logutil/log_test.go @@ -82,6 +82,22 @@ func TestZapLoggerWithKeys(t *testing.T) { err = os.Remove(fileCfg.Filename) require.NoError(t, err) + err = InitLogger(conf) + require.NoError(t, err) + newLogger := LoggerWithTraceInfo(log.L(), &model.TraceInfo{ConnectionID: 456, SessionAlias: "alias789"}) + ctx1 = context.WithValue(context.Background(), CtxLogKey, newLogger) + testZapLogger(ctx1, t, fileCfg.Filename, zapLogWithTraceInfoPattern) + err = os.Remove(fileCfg.Filename) + require.NoError(t, err) + + err = InitLogger(conf) + require.NoError(t, err) + newLogger = LoggerWithTraceInfo(log.L(), nil) + ctx1 = context.WithValue(context.Background(), CtxLogKey, newLogger) + testZapLogger(ctx1, t, fileCfg.Filename, zapLogWithoutCheckKeyPattern) + err = os.Remove(fileCfg.Filename) + require.NoError(t, err) + err = InitLogger(conf) require.NoError(t, err) key := "ctxKey" diff --git a/util/logutil/main_test.go b/util/logutil/main_test.go index 0f6b850e693a9..e4680ab7780c5 100644 --- a/util/logutil/main_test.go +++ b/util/logutil/main_test.go @@ -22,6 +22,9 @@ import ( ) const ( + // zapLogWithoutCheckKeyPattern is used to match the zap log format but do not check some specified key, such as the following log: + // [2019/02/13 15:56:05.385 +08:00] [INFO] [log_test.go:167] ["info message"]["str key"=val] ["int key"=123] + zapLogWithoutCheckKeyPattern = `\[\d\d\d\d/\d\d/\d\d \d\d:\d\d:\d\d.\d\d\d\ (\+|-)\d\d:\d\d\] \[(FATAL|ERROR|WARN|INFO|DEBUG)\] \[([\w_%!$@.,+~-]+|\\.)+:\d+\] \[.*\] (\[.*=.*\]).*\n` // zapLogPatern is used to match the zap log format, such as the following log: // [2019/02/13 15:56:05.385 +08:00] [INFO] [log_test.go:167] ["info message"] [conn=conn1] ["str key"=val] ["int key"=123] zapLogWithConnIDPattern = `\[\d\d\d\d/\d\d/\d\d \d\d:\d\d:\d\d.\d\d\d\ (\+|-)\d\d:\d\d\] \[(FATAL|ERROR|WARN|INFO|DEBUG)\] \[([\w_%!$@.,+~-]+|\\.)+:\d+\] \[.*\] \[conn=.*\] (\[.*=.*\]).*\n`