Skip to content

Commit

Permalink
ddl: add conn and session_alias entry in ddl worker log (#46443)
Browse files Browse the repository at this point in the history
ref #46071, close #46441
  • Loading branch information
lcwangchao authored Aug 29, 2023
1 parent 9176d20 commit 2fa680d
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 28 deletions.
4 changes: 4 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,10 @@ func setDDLJobQuery(ctx sessionctx.Context, job *model.Job) {
// - context.Cancel: job has been sent to worker, but not found in history DDL job before cancel
// - other: found in history DDL job and return that job error
func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {
job.TraceInfo = &model.TraceInfo{
ConnectionID: ctx.GetSessionVars().ConnectionID,
SessionAlias: ctx.GetSessionVars().SessionAlias,
}
if mci := ctx.GetSessionVars().StmtCtx.MultiSchemaInfo; mci != nil {
// In multiple schema change, we don't run the job.
// Instead, we merge all the jobs into one pending job.
Expand Down
50 changes: 31 additions & 19 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,21 @@ func newWorker(ctx context.Context, tp workerType, sessPool *sess.Pool, delRange
delRangeManager: delRangeMgr,
}
worker.addingDDLJobKey = addingDDLJobPrefix + worker.typeStr()
worker.logCtx = logutil.WithKeyValue(context.Background(), "worker", worker.String())
worker.logCtx = logutil.WithFields(context.Background(), zap.String("worker", worker.String()), zap.String("category", "ddl"))
return worker
}

func (w *worker) jobLogger(job *model.Job) *zap.Logger {
logger := logutil.Logger(w.logCtx)
if job != nil {
logger = logutil.LoggerWithTraceInfo(
logger.With(zap.Int64("jobID", job.ID)),
job.TraceInfo,
)
}
return logger
}

func (w *worker) typeStr() string {
var str string
switch w.tp {
Expand All @@ -167,7 +178,7 @@ func (w *worker) Close() {
w.sessPool.Put(w.sess.Session())
}
w.wg.Wait()
logutil.Logger(w.logCtx).Info("DDL worker closed", zap.String("category", "ddl"), zap.Duration("take time", time.Since(startTime)))
logutil.Logger(w.logCtx).Info("DDL worker closed", zap.Duration("take time", time.Since(startTime)))
}

func (dc *ddlCtx) asyncNotifyByEtcd(etcdPath string, jobID int64, jobType string) {
Expand Down Expand Up @@ -416,7 +427,7 @@ func (w *worker) handleUpdateJobError(t *meta.Meta, job *model.Job, err error) e
return nil
}
if kv.ErrEntryTooLarge.Equal(err) {
logutil.Logger(w.logCtx).Warn("update DDL job failed", zap.String("category", "ddl"), zap.String("job", job.String()), zap.Error(err))
w.jobLogger(job).Warn("update DDL job failed", zap.String("job", job.String()), zap.Error(err))
w.sess.Rollback()
err1 := w.sess.Begin()
if err1 != nil {
Expand All @@ -443,7 +454,7 @@ func (w *worker) updateDDLJob(job *model.Job, meetErr bool) error {
})
updateRawArgs := needUpdateRawArgs(job, meetErr)
if !updateRawArgs {
logutil.Logger(w.logCtx).Info("meet something wrong before update DDL job, shouldn't update raw args", zap.String("category", "ddl"),
w.jobLogger(job).Info("meet something wrong before update DDL job, shouldn't update raw args",
zap.String("job", job.String()))
}
return errors.Trace(updateDDLJob2Table(w.sess, job, updateRawArgs))
Expand Down Expand Up @@ -601,7 +612,7 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
}

job.BinlogInfo.FinishedTS = t.StartTS
logutil.Logger(w.logCtx).Info("finish DDL job", zap.String("category", "ddl"), zap.String("job", job.String()))
w.jobLogger(job).Info("finish DDL job", zap.String("job", job.String()))
updateRawArgs := true
if job.Type == model.ActionAddPrimaryKey && !job.IsCancelled() {
// ActionAddPrimaryKey needs to check the warnings information in job.Args.
Expand Down Expand Up @@ -825,7 +836,7 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) {
if !dbterror.ErrPausedDDLJob.Equal(runJobErr) {
// wait a while to retry again. If we don't wait here, DDL will retry this job immediately,
// which may act like a deadlock.
logutil.Logger(w.logCtx).Info("run DDL job failed, sleeps a while then retries it.", zap.String("category", "ddl"),
w.jobLogger(job).Info("run DDL job failed, sleeps a while then retries it.",
zap.Duration("waitTime", GetWaitTimeWhenErrorOccurred()), zap.Error(runJobErr))
}

Expand Down Expand Up @@ -889,8 +900,7 @@ func (w *worker) waitDependencyJobFinished(job *model.Job, cnt *int) {
if job.DependencyID != noneDependencyJob {
intervalCnt := int(3 * time.Second / waitDependencyJobInterval)
if *cnt%intervalCnt == 0 {
logutil.Logger(w.logCtx).Info("DDL job need to wait dependent job, sleeps a while, then retries it.", zap.String("category", "ddl"),
zap.Int64("jobID", job.ID),
w.jobLogger(job).Info("DDL job need to wait dependent job, sleeps a while, then retries it.",
zap.Int64("dependentJobID", job.DependencyID),
zap.Duration("waitTime", waitDependencyJobInterval))
}
Expand Down Expand Up @@ -918,15 +928,16 @@ func (w *worker) countForPanic(job *model.Job) {
}
job.ErrorCount++

logger := w.jobLogger(job)
// Load global DDL variables.
if err1 := loadDDLVars(w); err1 != nil {
logutil.Logger(w.logCtx).Error("load DDL global variable failed", zap.String("category", "ddl"), zap.Error(err1))
logger.Error("load DDL global variable failed", zap.Error(err1))
}
errorCount := variable.GetDDLErrorCountLimit()

if job.ErrorCount > errorCount {
msg := fmt.Sprintf("panic in handling DDL logic and error count beyond the limitation %d, cancelled", errorCount)
logutil.Logger(w.logCtx).Warn(msg)
logger.Warn(msg)
job.Error = toTError(errors.New(msg))
job.State = model.JobStateCancelled
}
Expand All @@ -937,32 +948,33 @@ func (w *worker) countForError(err error, job *model.Job) error {
job.Error = toTError(err)
job.ErrorCount++

logger := w.jobLogger(job)
// If job is cancelled, we shouldn't return an error and shouldn't load DDL variables.
if job.State == model.JobStateCancelled {
logutil.Logger(w.logCtx).Info("DDL job is cancelled normally", zap.String("category", "ddl"), zap.Error(err))
logger.Info("DDL job is cancelled normally", zap.Error(err))
return nil
}
logutil.Logger(w.logCtx).Warn("run DDL job error", zap.String("category", "ddl"), zap.Error(err))
logger.Warn("run DDL job error", zap.Error(err))

// Load global DDL variables.
if err1 := loadDDLVars(w); err1 != nil {
logutil.Logger(w.logCtx).Error("load DDL global variable failed", zap.String("category", "ddl"), zap.Error(err1))
logger.Error("load DDL global variable failed", zap.Error(err1))
}
// Check error limit to avoid falling into an infinite loop.
if job.ErrorCount > variable.GetDDLErrorCountLimit() && job.State == model.JobStateRunning && job.IsRollbackable() {
logutil.Logger(w.logCtx).Warn("DDL job error count exceed the limit, cancelling it now", zap.String("category", "ddl"), zap.Int64("jobID", job.ID), zap.Int64("errorCountLimit", variable.GetDDLErrorCountLimit()))
logger.Warn("DDL job error count exceed the limit, cancelling it now", zap.Int64("errorCountLimit", variable.GetDDLErrorCountLimit()))
job.State = model.JobStateCancelling
}
return err
}

func (w *worker) processJobPausingRequest(d *ddlCtx, job *model.Job) (isRunnable bool, err error) {
if job.IsPaused() {
logutil.Logger(w.logCtx).Debug("paused DDL job ", zap.String("category", "ddl"), zap.String("job", job.String()))
w.jobLogger(job).Debug("paused DDL job ", zap.String("job", job.String()))
return false, err
}
if job.IsPausing() {
logutil.Logger(w.logCtx).Debug("pausing DDL job ", zap.String("category", "ddl"), zap.String("job", job.String()))
w.jobLogger(job).Debug("pausing DDL job ", zap.String("job", job.String()))
job.State = model.JobStatePaused
return false, pauseReorgWorkers(w, d, job)
}
Expand All @@ -980,7 +992,7 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
failpoint.Inject("mockPanicInRunDDLJob", func(val failpoint.Value) {})

if job.Type != model.ActionMultiSchemaChange {
logutil.Logger(w.logCtx).Info("run DDL job", zap.String("category", "ddl"), zap.String("job", job.String()))
w.jobLogger(job).Info("run DDL job", zap.String("category", "ddl"), zap.String("job", job.String()))
}
timeStart := time.Now()
if job.RealStartTS == 0 {
Expand All @@ -990,13 +1002,13 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerRunDDLJob, job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(timeStart).Seconds())
}()
if job.IsFinished() {
logutil.Logger(w.logCtx).Debug("finish DDL job", zap.String("category", "ddl"), zap.String("job", job.String()))
w.jobLogger(job).Debug("finish DDL job", zap.String("category", "ddl"), zap.String("job", job.String()))
return ver, err
}

// The cause of this job state is that the job is cancelled by client.
if job.IsCancelling() {
logutil.Logger(w.logCtx).Debug("cancel DDL job", zap.String("category", "ddl"), zap.String("job", job.String()))
w.jobLogger(job).Debug("cancel DDL job", zap.String("job", job.String()))
return convertJob2RollbackJob(w, d, t, job)
}

Expand Down
16 changes: 8 additions & 8 deletions ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -124,7 +123,7 @@ func convertNotReorgAddIdxJob2RollbackJob(d *ddlCtx, t *meta.Meta, job *model.Jo
func rollingbackModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
if needNotifyAndStopReorgWorker(job) {
// column type change workers are started. we have to ask them to exit.
logutil.Logger(w.logCtx).Info("run the cancelling DDL job", zap.String("category", "ddl"), zap.String("job", job.String()))
w.jobLogger(job).Info("run the cancelling DDL job", zap.String("job", job.String()))
d.notifyReorgWorkerJobStateChange(job)
// Give the this kind of ddl one more round to run, the dbterror.ErrCancelledDDLJob should be fetched from the bottom up.
return w.onModifyColumn(d, t, job)
Expand Down Expand Up @@ -240,7 +239,7 @@ func rollingbackDropIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, e
func rollingbackAddIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, isPK bool) (ver int64, err error) {
if needNotifyAndStopReorgWorker(job) {
// add index workers are started. need to ask them to exit.
logutil.Logger(w.logCtx).Info("run the cancelling DDL job", zap.String("category", "ddl"), zap.String("job", job.String()))
w.jobLogger(job).Info("run the cancelling DDL job", zap.String("job", job.String()))
d.notifyReorgWorkerJobStateChange(job)
ver, err = w.onCreateIndex(d, t, job, isPK)
} else {
Expand Down Expand Up @@ -407,7 +406,7 @@ func rollingbackReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (ve

func pauseReorgWorkers(w *worker, d *ddlCtx, job *model.Job) (err error) {
if needNotifyAndStopReorgWorker(job) {
logutil.Logger(w.logCtx).Info("pausing the DDL job", zap.String("category", "ddl"), zap.String("job", job.String()))
w.jobLogger(job).Info("pausing the DDL job", zap.String("job", job.String()))
d.notifyReorgWorkerJobStateChange(job)
}

Expand Down Expand Up @@ -466,6 +465,7 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job)
err = dbterror.ErrCancelledDDLJob
}

logger := w.jobLogger(job)
if err != nil {
if job.Error == nil {
job.Error = toTError(err)
Expand All @@ -485,22 +485,22 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job)
// job state and args may not be correctly overwritten. The job will be fetched to run with the cancelling
// state again. So we should check the error count here.
if err1 := loadDDLVars(w); err1 != nil {
logutil.Logger(w.logCtx).Error("load DDL global variable failed", zap.String("category", "ddl"), zap.Error(err1))
logger.Error("load DDL global variable failed", zap.Error(err1))
}
errorCount := variable.GetDDLErrorCountLimit()
if job.ErrorCount > errorCount {
logutil.Logger(w.logCtx).Warn("rollback DDL job error count exceed the limit, cancelled it now", zap.String("category", "ddl"), zap.Int64("jobID", job.ID), zap.Int64("errorCountLimit", errorCount))
logger.Warn("rollback DDL job error count exceed the limit, cancelled it now", zap.Int64("errorCountLimit", errorCount))
job.Error = toTError(errors.Errorf("rollback DDL job error count exceed the limit %d, cancelled it now", errorCount))
job.State = model.JobStateCancelled
}
}

if !(job.State != model.JobStateRollingback && job.State != model.JobStateCancelled) {
logutil.Logger(w.logCtx).Info("the DDL job is cancelled normally", zap.String("category", "ddl"), zap.String("job", job.String()), zap.Error(err))
logger.Info("the DDL job is cancelled normally", zap.String("job", job.String()), zap.Error(err))
// If job is cancelled, we shouldn't return an error.
return ver, nil
}
logutil.Logger(w.logCtx).Error("run DDL job failed", zap.String("category", "ddl"), zap.String("job", job.String()), zap.Error(err))
logger.Error("run DDL job failed", zap.String("job", job.String()), zap.Error(err))
}

return
Expand Down
4 changes: 4 additions & 0 deletions parser/model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ func (sub *SubJob) ToProxyJob(parentJob *Job) Job {
Charset: parentJob.Charset,
Collate: parentJob.Collate,
AdminOperator: parentJob.AdminOperator,
TraceInfo: parentJob.TraceInfo,
}
}

Expand Down Expand Up @@ -441,6 +442,9 @@ type Job struct {
// AdminOperator indicates where the Admin command comes, by the TiDB
// itself (AdminCommandBySystem) or by user (AdminCommandByEndUser).
AdminOperator AdminCommandOperator `json:"admin_operator"`

// TraceInfo indicates the information for SQL tracing
TraceInfo *TraceInfo `json:"trace_info"`
}

// FinishTableJob is called when a job is finished.
Expand Down
2 changes: 1 addition & 1 deletion parser/model/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestJobSize(t *testing.T) {
- SubJob.ToProxyJob()
`
job := model.Job{}
require.Equal(t, 328, int(unsafe.Sizeof(job)), msg)
require.Equal(t, 336, int(unsafe.Sizeof(job)), msg)
}

func TestBackfillMetaCodec(t *testing.T) {
Expand Down
13 changes: 13 additions & 0 deletions util/logutil/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,19 @@ func BgLogger() *zap.Logger {
return log.L()
}

// LoggerWithTraceInfo attaches fields from trace info to logger
func LoggerWithTraceInfo(logger *zap.Logger, info *model.TraceInfo) *zap.Logger {
if logger == nil {
logger = log.L()
}

if fields := fieldsFromTraceInfo(info); len(fields) > 0 {
logger = logger.With(fields...)
}

return logger
}

// WithConnID attaches connId to context.
func WithConnID(ctx context.Context, connID uint64) context.Context {
return WithFields(ctx, zap.Uint64("conn", connID))
Expand Down
16 changes: 16 additions & 0 deletions util/logutil/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,22 @@ func TestZapLoggerWithKeys(t *testing.T) {
err = os.Remove(fileCfg.Filename)
require.NoError(t, err)

err = InitLogger(conf)
require.NoError(t, err)
newLogger := LoggerWithTraceInfo(log.L(), &model.TraceInfo{ConnectionID: 456, SessionAlias: "alias789"})
ctx1 = context.WithValue(context.Background(), CtxLogKey, newLogger)
testZapLogger(ctx1, t, fileCfg.Filename, zapLogWithTraceInfoPattern)
err = os.Remove(fileCfg.Filename)
require.NoError(t, err)

err = InitLogger(conf)
require.NoError(t, err)
newLogger = LoggerWithTraceInfo(log.L(), nil)
ctx1 = context.WithValue(context.Background(), CtxLogKey, newLogger)
testZapLogger(ctx1, t, fileCfg.Filename, zapLogWithoutCheckKeyPattern)
err = os.Remove(fileCfg.Filename)
require.NoError(t, err)

err = InitLogger(conf)
require.NoError(t, err)
key := "ctxKey"
Expand Down
3 changes: 3 additions & 0 deletions util/logutil/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (
)

const (
// zapLogWithoutCheckKeyPattern is used to match the zap log format but do not check some specified key, such as the following log:
// [2019/02/13 15:56:05.385 +08:00] [INFO] [log_test.go:167] ["info message"]["str key"=val] ["int key"=123]
zapLogWithoutCheckKeyPattern = `\[\d\d\d\d/\d\d/\d\d \d\d:\d\d:\d\d.\d\d\d\ (\+|-)\d\d:\d\d\] \[(FATAL|ERROR|WARN|INFO|DEBUG)\] \[([\w_%!$@.,+~-]+|\\.)+:\d+\] \[.*\] (\[.*=.*\]).*\n`
// zapLogPatern is used to match the zap log format, such as the following log:
// [2019/02/13 15:56:05.385 +08:00] [INFO] [log_test.go:167] ["info message"] [conn=conn1] ["str key"=val] ["int key"=123]
zapLogWithConnIDPattern = `\[\d\d\d\d/\d\d/\d\d \d\d:\d\d:\d\d.\d\d\d\ (\+|-)\d\d:\d\d\] \[(FATAL|ERROR|WARN|INFO|DEBUG)\] \[([\w_%!$@.,+~-]+|\\.)+:\d+\] \[.*\] \[conn=.*\] (\[.*=.*\]).*\n`
Expand Down

0 comments on commit 2fa680d

Please sign in to comment.