From cb5affbc3cced5e75232c1cd8804431bad5ea5d6 Mon Sep 17 00:00:00 2001 From: Lynn Date: Wed, 4 Jan 2023 15:20:19 +0800 Subject: [PATCH] ddl, parser: Implement the write-reorg state split task related functions, and the related interfaces of backfill worker (#39982) close pingcap/tidb#37123 --- ddl/backfilling.go | 541 ++++++++++++++++++++++++++++++++----- ddl/column.go | 58 ++-- ddl/ddl.go | 22 +- ddl/ddl_worker.go | 16 +- ddl/ddl_workerpool_test.go | 4 +- ddl/index.go | 150 +++++++--- ddl/index_merge_tmp.go | 41 ++- ddl/ingest/engine_mgr.go | 19 +- ddl/job_table.go | 96 ++++--- ddl/job_table_test.go | 196 +++++++++++--- ddl/partition.go | 2 +- ddl/reorg.go | 8 +- executor/BUILD.bazel | 1 + executor/ddl_test.go | 15 + parser/model/ddl.go | 10 +- 15 files changed, 929 insertions(+), 250 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 0f0910e1caf28..a7c23a545208e 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -61,7 +61,13 @@ const ( typeAddIndexMergeTmpWorker backfillerType = 3 // InstanceLease is the instance lease. - InstanceLease = 1 * time.Minute + InstanceLease = 1 * time.Minute + updateInstanceLease = 25 * time.Second + genTaskBatch = 4096 + minGenTaskBatch = 1024 + minDistTaskCnt = 16 + retrySQLTimes = 3 + retrySQLInterval = 500 * time.Millisecond ) func (bT backfillerType) String() string { @@ -107,8 +113,8 @@ func (bj *BackfillJob) AbbrStr() string { bj.ID, bj.JobID, bj.EleID, bj.Tp, bj.State, bj.InstanceID, bj.InstanceLease) } -// GetOracleTime returns the current time from TS. -func GetOracleTime(se *session) (time.Time, error) { +// GetOracleTimeWithStartTS returns the current time with txn's startTS. +func GetOracleTimeWithStartTS(se *session) (time.Time, error) { txn, err := se.Txn(true) if err != nil { return time.Time{}, err @@ -116,6 +122,15 @@ func GetOracleTime(se *session) (time.Time, error) { return oracle.GetTimeFromTS(txn.StartTS()).UTC(), nil } +// GetOracleTime returns the current time from TS. +func GetOracleTime(store kv.Storage) (time.Time, error) { + currentVer, err := store.CurrentVersion(kv.GlobalTxnScope) + if err != nil { + return time.Time{}, errors.Trace(err) + } + return oracle.GetTimeFromTS(currentVer.Ver).UTC(), nil +} + // GetLeaseGoTime returns a types.Time by adding a lease. func GetLeaseGoTime(currTime time.Time, lease time.Duration) types.Time { leaseTime := currTime.Add(lease) @@ -174,9 +189,35 @@ func GetLeaseGoTime(currTime time.Time, lease time.Duration) types.Time { // Instead, it is divided into batches, each time a kv transaction completes the backfilling // of a partial batch. +type backfillCtx struct { + *ddlCtx + reorgTp model.ReorgType + sessCtx sessionctx.Context + schemaName string + table table.Table + batchCnt int +} + +func newBackfillCtx(ctx *ddlCtx, sessCtx sessionctx.Context, reorgTp model.ReorgType, + schemaName string, tbl table.Table) *backfillCtx { + return &backfillCtx{ + ddlCtx: ctx, + sessCtx: sessCtx, + reorgTp: reorgTp, + schemaName: schemaName, + table: tbl, + batchCnt: int(variable.GetDDLReorgBatchSize()), + } +} + type backfiller interface { BackfillDataInTxn(handleRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) AddMetricInfo(float64) + GetTask() (*BackfillJob, error) + UpdateTask(bfJob *BackfillJob) error + FinishTask(bfJob *BackfillJob) error + GetCtx() *backfillCtx + String() string } type backfillResult struct { @@ -199,11 +240,26 @@ type backfillTaskContext struct { } type reorgBackfillTask struct { + bfJob *BackfillJob + physicalTable table.PhysicalTable + + // TODO: Remove the following fields after remove the function of run. id int physicalTableID int64 startKey kv.Key endKey kv.Key endInclude bool + jobID int64 + sqlQuery string + priority int +} + +func (r *reorgBackfillTask) getJobID() int64 { + jobID := r.jobID + if r.bfJob != nil { + jobID = r.bfJob.JobID + } + return jobID } func (r *reorgBackfillTask) excludedEndKey() kv.Key { @@ -232,35 +288,49 @@ func mergeBackfillCtxToResult(taskCtx *backfillTaskContext, result *backfillResu } type backfillWorker struct { - id int - reorgInfo *reorgInfo - batchCnt int - sessCtx sessionctx.Context - taskCh chan *reorgBackfillTask - resultCh chan *backfillResult - table table.Table - priority int - tp backfillerType - ctx context.Context - cancel func() + id int + backfiller + taskCh chan *reorgBackfillTask + resultCh chan *backfillResult + ctx context.Context + cancel func() } -func newBackfillWorker(ctx context.Context, sessCtx sessionctx.Context, id int, t table.PhysicalTable, - reorgInfo *reorgInfo, tp backfillerType) *backfillWorker { +func newBackfillWorker(ctx context.Context, id int, bf backfiller) *backfillWorker { bfCtx, cancel := context.WithCancel(ctx) return &backfillWorker{ - id: id, - table: t, - reorgInfo: reorgInfo, - batchCnt: int(variable.GetDDLReorgBatchSize()), - sessCtx: sessCtx, - priority: reorgInfo.Job.Priority, - tp: tp, - ctx: bfCtx, - cancel: cancel, + backfiller: bf, + id: id, + taskCh: make(chan *reorgBackfillTask, 1), + resultCh: make(chan *backfillResult, 1), + ctx: bfCtx, + cancel: cancel, } } +func (w *backfillWorker) updateLease(execID string, bfJob *BackfillJob, nextKey kv.Key) error { + leaseTime, err := GetOracleTime(w.GetCtx().store) + if err != nil { + return err + } + bfJob.CurrKey = nextKey + bfJob.InstanceID = execID + bfJob.InstanceLease = GetLeaseGoTime(leaseTime, InstanceLease) + return w.backfiller.UpdateTask(bfJob) +} + +func (w *backfillWorker) finishJob(bfJob *BackfillJob) error { + bfJob.State = model.JobStateDone + return w.backfiller.FinishTask(bfJob) +} + +func (w *backfillWorker) String() string { + if w.backfiller == nil { + return fmt.Sprintf("worker %d", w.id) + } + return fmt.Sprintf("worker %d, tp %s", w.id, w.backfiller.String()) +} + func (w *backfillWorker) Close() { if w.cancel != nil { w.cancel() @@ -286,17 +356,19 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, addedCount: 0, nextKey: handleRange.startKey, } + batchStartTime := time.Now() lastLogCount := 0 lastLogTime := time.Now() startTime := lastLogTime - rc := d.getReorgCtx(w.reorgInfo.Job) + jobID := task.getJobID() + rc := d.getReorgCtx(jobID) for { // Give job chance to be canceled, if we not check it here, // if there is panic in bf.BackfillDataInTxn we will never cancel the job. // Because reorgRecordTask may run a long time, // we should check whether this ddl job is still runnable. - err := d.isReorgRunnable(w.reorgInfo.Job) + err := d.isReorgRunnable(jobID) if err != nil { result.err = err return result @@ -325,9 +397,7 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, if num := result.scanCount - lastLogCount; num >= 90000 { lastLogCount = result.scanCount logutil.BgLogger().Info("[ddl] backfill worker back fill index", - zap.Int("worker ID", w.id), - zap.Int("added count", result.addedCount), - zap.Int("scan count", result.scanCount), + zap.Int("addedCount", result.addedCount), zap.Int("scanCount", result.scanCount), zap.String("next key", hex.EncodeToString(taskCtx.nextKey)), zap.Float64("speed(rows/s)", float64(num)/time.Since(lastLogTime).Seconds())) lastLogTime = time.Now() @@ -337,11 +407,24 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, if taskCtx.done { break } + + if task.bfJob != nil { + // TODO: Adjust the updating lease frequency by batch processing time carefully. + if time.Since(batchStartTime) < updateInstanceLease { + continue + } + batchStartTime = time.Now() + if err := w.updateLease(w.GetCtx().uuid, task.bfJob, result.nextKey); err != nil { + logutil.BgLogger().Info("[ddl] backfill worker handle task, update lease failed", zap.Stringer("worker", w), + zap.Stringer("task", task), zap.String("backfill job", task.bfJob.AbbrStr()), zap.Error(err)) + result.err = err + return result + } + } } logutil.BgLogger().Info("[ddl] backfill worker finish task", - zap.Stringer("type", w.tp), - zap.Int("worker ID", w.id), - zap.String("task", task.String()), + zap.Stringer("worker", w), + zap.Stringer("task", task), zap.Int("added count", result.addedCount), zap.Int("scan count", result.scanCount), zap.String("next key", hex.EncodeToString(result.nextKey)), @@ -353,9 +436,7 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, } func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) { - logutil.BgLogger().Info("[ddl] backfill worker start", - zap.Stringer("type", w.tp), - zap.Int("workerID", w.id)) + logutil.BgLogger().Info("[ddl] backfill worker start", zap.Stringer("worker", w)) var curTaskID int defer util.Recover(metrics.LabelDDL, "backfillWorker.run", func() { w.resultCh <- &backfillResult{taskID: curTaskID, err: dbterror.ErrReorgPanic} @@ -363,17 +444,17 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) { for { if util.HasCancelled(w.ctx) { logutil.BgLogger().Info("[ddl] backfill worker exit on context done", - zap.Stringer("type", w.tp), zap.Int("workerID", w.id)) + zap.Stringer("worker", w), zap.Int("workerID", w.id)) return } task, more := <-w.taskCh if !more { logutil.BgLogger().Info("[ddl] backfill worker exit", - zap.Stringer("type", w.tp), zap.Int("workerID", w.id)) + zap.Stringer("worker", w), zap.Int("workerID", w.id)) return } curTaskID = task.id - d.setDDLLabelForTopSQL(job) + d.setDDLLabelForTopSQL(job.ID, job.Query) logutil.BgLogger().Debug("[ddl] backfill worker got task", zap.Int("workerID", w.id), zap.String("task", task.String())) failpoint.Inject("mockBackfillRunErr", func() { @@ -394,12 +475,12 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) { }) // Change the batch size dynamically. - w.batchCnt = int(variable.GetDDLReorgBatchSize()) + w.GetCtx().batchCnt = int(variable.GetDDLReorgBatchSize()) result := w.handleBackfillTask(d, task, bf) w.resultCh <- result if result.err != nil { logutil.BgLogger().Info("[ddl] backfill worker exit on error", - zap.Stringer("type", w.tp), zap.Int("workerID", w.id), zap.Error(result.err)) + zap.Stringer("worker", w), zap.Int("workerID", w.id), zap.Error(result.err)) return } } @@ -499,7 +580,7 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount nextKey, taskAddedCount, err := waitTaskResults(scheduler, batchTasks, totalAddedCount) elapsedTime := time.Since(startTime) if err == nil { - err = dc.isReorgRunnable(reorgInfo.Job) + err = dc.isReorgRunnable(reorgInfo.Job.ID) } if err != nil { @@ -507,8 +588,7 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount err1 := reorgInfo.UpdateReorgMeta(nextKey, scheduler.sessPool) metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblError).Observe(elapsedTime.Seconds()) logutil.BgLogger().Warn("[ddl] backfill worker handle batch tasks failed", - zap.ByteString("element type", reorgInfo.currElement.TypeKey), - zap.Int64("element ID", reorgInfo.currElement.ID), + zap.Int64("total added count", *totalAddedCount), zap.String("start key", hex.EncodeToString(startKey)), zap.String("next key", hex.EncodeToString(nextKey)), @@ -526,11 +606,10 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount } // nextHandle will be updated periodically in runReorgJob, so no need to update it here. - dc.getReorgCtx(reorgInfo.Job).setNextKey(nextKey) + dc.getReorgCtx(reorgInfo.Job.ID).setNextKey(nextKey) metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblOK).Observe(elapsedTime.Seconds()) logutil.BgLogger().Info("[ddl] backfill workers successfully processed batch", - zap.ByteString("element type", reorgInfo.currElement.TypeKey), - zap.Int64("element ID", reorgInfo.currElement.ID), + zap.Stringer("element", reorgInfo.currElement), zap.Int64("total added count", *totalAddedCount), zap.String("start key", hex.EncodeToString(startKey)), zap.String("next key", hex.EncodeToString(nextKey)), @@ -539,11 +618,8 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount return nil } -// handleRangeTasks sends tasks to workers, and returns remaining kvRanges that is not handled. -func (dc *ddlCtx) handleRangeTasks(scheduler *backfillScheduler, t table.Table, - totalAddedCount *int64, kvRanges []kv.KeyRange) ([]kv.KeyRange, error) { - batchTasks := make([]*reorgBackfillTask, 0, backfillTaskChanSize) - reorgInfo := scheduler.reorgInfo +func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange, batch int) []*reorgBackfillTask { + batchTasks := make([]*reorgBackfillTask, 0, batch) physicalTableID := reorgInfo.PhysicalTableID var prefix kv.Key if reorgInfo.mergingTmpIdx { @@ -553,14 +629,15 @@ func (dc *ddlCtx) handleRangeTasks(scheduler *backfillScheduler, t table.Table, } // Build reorg tasks. job := reorgInfo.Job + jobCtx := reorgInfo.d.jobContext(reorgInfo.Job.ID) for i, keyRange := range kvRanges { startKey := keyRange.StartKey endKey := keyRange.EndKey - endK, err := getRangeEndKey(scheduler.jobCtx, dc.store, job.Priority, prefix, keyRange.StartKey, endKey) + endK, err := getRangeEndKey(jobCtx, reorgInfo.d.store, job.Priority, prefix, keyRange.StartKey, endKey) if err != nil { - logutil.BgLogger().Info("[ddl] send range task to workers, get reverse key failed", zap.Error(err)) + logutil.BgLogger().Info("[ddl] get backfill range task, get reverse key failed", zap.Error(err)) } else { - logutil.BgLogger().Info("[ddl] send range task to workers, change end key", + logutil.BgLogger().Info("[ddl] get backfill range task, change end key", zap.String("end key", hex.EncodeToString(endKey)), zap.String("current end key", hex.EncodeToString(endK))) endKey = endK } @@ -571,20 +648,31 @@ func (dc *ddlCtx) handleRangeTasks(scheduler *backfillScheduler, t table.Table, endKey = prefix.PrefixNext() } + //nolint:forcetypeassert + phyTbl := t.(table.PhysicalTable) task := &reorgBackfillTask{ id: i, + jobID: reorgInfo.Job.ID, physicalTableID: physicalTableID, + physicalTable: phyTbl, + priority: reorgInfo.Priority, startKey: startKey, endKey: endKey, // If the boundaries overlap, we should ignore the preceding endKey. endInclude: endK.Cmp(keyRange.EndKey) != 0 || i == len(kvRanges)-1} batchTasks = append(batchTasks, task) - if len(batchTasks) >= backfillTaskChanSize { + if len(batchTasks) >= batch { break } } + return batchTasks +} +// handleRangeTasks sends tasks to workers, and returns remaining kvRanges that is not handled. +func (dc *ddlCtx) handleRangeTasks(scheduler *backfillScheduler, t table.Table, + totalAddedCount *int64, kvRanges []kv.KeyRange) ([]kv.KeyRange, error) { + batchTasks := getBatchTasks(t, scheduler.reorgInfo, kvRanges, backfillTaskChanSize) if len(batchTasks) == 0 { return nil, nil } @@ -746,7 +834,9 @@ func (b *backfillScheduler) adjustWorkerSize() error { ) switch b.tp { case typeAddIndexWorker: - idxWorker, err := newAddIndexWorker(sessCtx, i, b.tbl, b.decodeColMap, reorgInfo, jc, job) + backfillCtx := newBackfillCtx(reorgInfo.d, sessCtx, reorgInfo.ReorgMeta.ReorgTp, job.SchemaName, b.tbl) + idxWorker, err := newAddIndexWorker(b.decodeColMap, i, b.tbl, backfillCtx, + jc, job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) if err != nil { if b.canSkipError(err) { continue @@ -754,18 +844,23 @@ func (b *backfillScheduler) adjustWorkerSize() error { return err } idxWorker.copReqSenderPool = b.copReqSenderPool - worker, runner = idxWorker, idxWorker.backfillWorker + runner = newBackfillWorker(jc.ddlJobCtx, i, idxWorker) + worker = idxWorker case typeAddIndexMergeTmpWorker: - tmpIdxWorker := newMergeTempIndexWorker(sessCtx, i, b.tbl, reorgInfo, jc) - worker, runner = tmpIdxWorker, tmpIdxWorker.backfillWorker + backfillCtx := newBackfillCtx(reorgInfo.d, sessCtx, reorgInfo.ReorgMeta.ReorgTp, job.SchemaName, b.tbl) + tmpIdxWorker := newMergeTempIndexWorker(backfillCtx, i, b.tbl, reorgInfo.currElement.ID, jc) + runner = newBackfillWorker(jc.ddlJobCtx, i, tmpIdxWorker) + worker = tmpIdxWorker case typeUpdateColumnWorker: // Setting InCreateOrAlterStmt tells the difference between SELECT casting and ALTER COLUMN casting. sessCtx.GetSessionVars().StmtCtx.InCreateOrAlterStmt = true - updateWorker := newUpdateColumnWorker(sessCtx, i, b.tbl, b.decodeColMap, reorgInfo, jc) - worker, runner = updateWorker, updateWorker.backfillWorker + updateWorker := newUpdateColumnWorker(sessCtx, b.tbl, b.decodeColMap, reorgInfo, jc) + runner = newBackfillWorker(jc.ddlJobCtx, i, updateWorker) + worker = updateWorker case typeCleanUpIndexWorker: - idxWorker := newCleanUpIndexWorker(sessCtx, i, b.tbl, b.decodeColMap, reorgInfo, jc) - worker, runner = idxWorker, idxWorker.backfillWorker + idxWorker := newCleanUpIndexWorker(sessCtx, b.tbl, b.decodeColMap, reorgInfo, jc) + runner = newBackfillWorker(jc.ddlJobCtx, i, idxWorker) + worker = idxWorker default: return errors.New("unknown backfill type") } @@ -856,7 +951,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic return errors.Trace(err) } - if err := dc.isReorgRunnable(reorgInfo.Job); err != nil { + if err := dc.isReorgRunnable(reorgInfo.Job.ID); err != nil { return errors.Trace(err) } if startKey == nil && endKey == nil { @@ -870,7 +965,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic } }) - jc := dc.jobContext(job) + jc := dc.jobContext(job.ID) scheduler := newBackfillScheduler(dc.ctx, reorgInfo, sessPool, bfWorkerType, t, decodeColMap, jc) defer scheduler.Close() @@ -946,6 +1041,320 @@ func injectCheckBackfillWorkerNum(curWorkerSize int, isMergeWorker bool) error { return nil } +func addBatchBackfillJobs(sess *session, bfWorkerType backfillerType, reorgInfo *reorgInfo, notDistTask bool, + batchTasks []*reorgBackfillTask, bJobs []*BackfillJob, isUnique bool, id *int64) error { + bJobs = bJobs[:0] + instanceID := "" + if notDistTask { + instanceID = reorgInfo.d.uuid + } + // TODO: Adjust the number of ranges(region) for each task. + for _, task := range batchTasks { + bm := &model.BackfillMeta{ + PhysicalTableID: reorgInfo.PhysicalTableID, + IsUnique: isUnique, + EndInclude: task.endInclude, + ReorgTp: reorgInfo.Job.ReorgMeta.ReorgTp, + SQLMode: reorgInfo.ReorgMeta.SQLMode, + Location: reorgInfo.ReorgMeta.Location, + JobMeta: &model.JobMeta{ + SchemaID: reorgInfo.Job.SchemaID, + TableID: reorgInfo.Job.TableID, + Query: reorgInfo.Job.Query, + }, + } + bj := &BackfillJob{ + ID: *id, + JobID: reorgInfo.Job.ID, + EleID: reorgInfo.currElement.ID, + EleKey: reorgInfo.currElement.TypeKey, + Tp: bfWorkerType, + State: model.JobStateNone, + InstanceID: instanceID, + CurrKey: task.startKey, + StartKey: task.startKey, + EndKey: task.endKey, + Meta: bm, + } + *id++ + bJobs = append(bJobs, bj) + } + if err := AddBackfillJobs(sess, bJobs); err != nil { + return errors.Trace(err) + } + return nil +} + +func (*ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo, pTbl table.PhysicalTable, isUnique bool, + bfWorkerType backfillerType, startKey kv.Key, currBackfillJobID int64) error { + endKey := reorgInfo.EndKey + isFirstOps := true + bJobs := make([]*BackfillJob, 0, genTaskBatch) + for { + kvRanges, err := splitTableRanges(pTbl, reorgInfo.d.store, startKey, endKey) + if err != nil { + return errors.Trace(err) + } + batchTasks := getBatchTasks(pTbl, reorgInfo, kvRanges, genTaskBatch) + if len(batchTasks) == 0 { + break + } + notNeedDistProcess := isFirstOps && (len(kvRanges) < minDistTaskCnt) + if err = addBatchBackfillJobs(sess, bfWorkerType, reorgInfo, notNeedDistProcess, batchTasks, bJobs, isUnique, &currBackfillJobID); err != nil { + return errors.Trace(err) + } + isFirstOps = false + + remains := kvRanges[len(batchTasks):] + // TODO: After adding backfillCh do asyncNotify(dc.backfillJobCh). + logutil.BgLogger().Info("[ddl] split backfill jobs to the backfill table", + zap.Int("batchTasksCnt", len(batchTasks)), + zap.Int("totalRegionCnt", len(kvRanges)), + zap.Int("remainRegionCnt", len(remains)), + zap.String("startHandle", hex.EncodeToString(startKey)), + zap.String("endHandle", hex.EncodeToString(endKey))) + + if len(remains) == 0 { + break + } + + for { + bJobCnt, err := checkBackfillJobCount(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) + if err != nil { + return errors.Trace(err) + } + if bJobCnt < minGenTaskBatch { + break + } + time.Sleep(retrySQLInterval) + } + startKey = remains[0].StartKey + } + return nil +} + +func (dc *ddlCtx) controlWritePhysicalTableRecord(sess *session, t table.PhysicalTable, bfWorkerType backfillerType, reorgInfo *reorgInfo) error { + startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey + if startKey == nil && endKey == nil { + return nil + } + + if err := dc.isReorgRunnable(reorgInfo.Job.ID); err != nil { + return errors.Trace(err) + } + + currBackfillJobID := int64(1) + err := checkAndHandleInterruptedBackfillJobs(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) + if err != nil { + return errors.Trace(err) + } + maxBfJob, err := GetMaxBackfillJob(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) + if err != nil { + return errors.Trace(err) + } + if maxBfJob != nil { + startKey = maxBfJob.EndKey + currBackfillJobID = maxBfJob.ID + 1 + } + + var isUnique bool + if bfWorkerType == typeAddIndexWorker { + idxInfo := model.FindIndexInfoByID(t.Meta().Indices, reorgInfo.currElement.ID) + isUnique = idxInfo.Unique + } + err = dc.splitTableToBackfillJobs(sess, reorgInfo, t, isUnique, bfWorkerType, startKey, currBackfillJobID) + if err != nil { + return errors.Trace(err) + } + + var backfillJobFinished bool + jobID := reorgInfo.Job.ID + ticker := time.NewTicker(300 * time.Millisecond) + defer ticker.Stop() + for { + if err := dc.isReorgRunnable(reorgInfo.Job.ID); err != nil { + return errors.Trace(err) + } + + select { + case <-ticker.C: + if !backfillJobFinished { + err := checkAndHandleInterruptedBackfillJobs(sess, jobID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) + if err != nil { + logutil.BgLogger().Warn("[ddl] finish interrupted backfill jobs", zap.Int64("job ID", jobID), zap.Error(err)) + return errors.Trace(err) + } + + bfJob, err := getBackfillJobWithRetry(sess, BackfillTable, jobID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey, false) + if err != nil { + logutil.BgLogger().Info("[ddl] getBackfillJobWithRetry failed", zap.Int64("job ID", jobID), zap.Error(err)) + return errors.Trace(err) + } + if bfJob == nil { + backfillJobFinished = true + logutil.BgLogger().Info("[ddl] finish backfill jobs", zap.Int64("job ID", jobID)) + } + } + if backfillJobFinished { + // TODO: Consider whether these backfill jobs are always out of sync. + isSynced, err := checkJobIsSynced(sess, jobID) + if err != nil { + logutil.BgLogger().Warn("[ddl] checkJobIsSynced failed", zap.Int64("job ID", jobID), zap.Error(err)) + return errors.Trace(err) + } + if isSynced { + logutil.BgLogger().Info("[ddl] sync backfill jobs", zap.Int64("job ID", jobID)) + return nil + } + } + case <-dc.ctx.Done(): + return dc.ctx.Err() + } + } +} + +func checkJobIsSynced(sess *session, jobID int64) (bool, error) { + var err error + var unsyncedInstanceIDs []string + for i := 0; i < retrySQLTimes; i++ { + unsyncedInstanceIDs, err = getUnsyncedInstanceIDs(sess, jobID, "check_backfill_history_job_sync") + if err == nil && len(unsyncedInstanceIDs) == 0 { + return true, nil + } + + logutil.BgLogger().Info("[ddl] checkJobIsSynced failed", + zap.Strings("unsyncedInstanceIDs", unsyncedInstanceIDs), zap.Int("tryTimes", i), zap.Error(err)) + time.Sleep(retrySQLInterval) + } + + return false, errors.Trace(err) +} + +func checkAndHandleInterruptedBackfillJobs(sess *session, jobID, currEleID int64, currEleKey []byte) (err error) { + var bJobs []*BackfillJob + for i := 0; i < retrySQLTimes; i++ { + bJobs, err = GetInterruptedBackfillJobsForOneEle(sess, jobID, currEleID, currEleKey) + if err == nil { + break + } + logutil.BgLogger().Info("[ddl] getInterruptedBackfillJobsForOneEle failed", zap.Error(err)) + time.Sleep(retrySQLInterval) + } + if err != nil { + return errors.Trace(err) + } + if len(bJobs) == 0 { + return nil + } + + for i := 0; i < retrySQLTimes; i++ { + err = MoveBackfillJobsToHistoryTable(sess, bJobs[0]) + if err == nil { + return errors.Errorf(bJobs[0].Meta.ErrMsg) + } + logutil.BgLogger().Info("[ddl] MoveBackfillJobsToHistoryTable failed", zap.Error(err)) + time.Sleep(retrySQLInterval) + } + return errors.Trace(err) +} + +func checkBackfillJobCount(sess *session, jobID, currEleID int64, currEleKey []byte) (backfillJobCnt int, err error) { + err = checkAndHandleInterruptedBackfillJobs(sess, jobID, currEleID, currEleKey) + if err != nil { + return 0, errors.Trace(err) + } + + backfillJobCnt, err = GetBackfillJobCount(sess, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s'", + jobID, currEleID, currEleKey), "check_backfill_job_count") + if err != nil { + return 0, errors.Trace(err) + } + + return backfillJobCnt, nil +} + +func getBackfillJobWithRetry(sess *session, tableName string, jobID, currEleID int64, currEleKey []byte, isDesc bool) (*BackfillJob, error) { + var err error + var bJobs []*BackfillJob + descStr := "" + if isDesc { + descStr = "order by id desc" + } + for i := 0; i < retrySQLTimes; i++ { + bJobs, err = GetBackfillJobs(sess, tableName, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s' %s limit 1", + jobID, currEleID, currEleKey, descStr), "check_backfill_job_state") + if err != nil { + logutil.BgLogger().Warn("[ddl] GetBackfillJobs failed", zap.Error(err)) + continue + } + + if len(bJobs) != 0 { + return bJobs[0], nil + } + break + } + return nil, errors.Trace(err) +} + +// GetMaxBackfillJob gets the max backfill job in BackfillTable and BackfillHistoryTable. +func GetMaxBackfillJob(sess *session, jobID, currEleID int64, currEleKey []byte) (*BackfillJob, error) { + bfJob, err := getBackfillJobWithRetry(sess, BackfillTable, jobID, currEleID, currEleKey, true) + if err != nil { + return nil, errors.Trace(err) + } + hJob, err := getBackfillJobWithRetry(sess, BackfillHistoryTable, jobID, currEleID, currEleKey, true) + if err != nil { + return nil, errors.Trace(err) + } + + if bfJob == nil { + return hJob, nil + } + if hJob == nil { + return bfJob, nil + } + if bfJob.ID > hJob.ID { + return bfJob, nil + } + return hJob, nil +} + +// MoveBackfillJobsToHistoryTable moves backfill table jobs to the backfill history table. +func MoveBackfillJobsToHistoryTable(sessCtx sessionctx.Context, bfJob *BackfillJob) error { + sess, ok := sessCtx.(*session) + if !ok { + return errors.Errorf("sess ctx:%#v convert session failed", sessCtx) + } + + return runInTxn(sess, func(se *session) error { + // TODO: Consider batch by batch update backfill jobs and insert backfill history jobs. + bJobs, err := GetBackfillJobs(sess, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s'", + bfJob.JobID, bfJob.EleID, bfJob.EleKey), "update_backfill_job") + if err != nil { + return errors.Trace(err) + } + if len(bJobs) == 0 { + return nil + } + + txn, err := se.txn() + if err != nil { + return errors.Trace(err) + } + startTS := txn.StartTS() + err = RemoveBackfillJob(sess, true, bJobs[0]) + if err == nil { + for _, bj := range bJobs { + bj.State = model.JobStateCancelled + bj.FinishTS = startTS + } + err = AddBackfillHistoryJob(sess, bJobs) + } + logutil.BgLogger().Info("[ddl] move backfill jobs to history table", zap.Int("job count", len(bJobs))) + return errors.Trace(err) + }) +} + // recordIterFunc is used for low-level record iteration. type recordIterFunc func(h kv.Handle, rowKey kv.Key, rawRecord []byte) (more bool, err error) diff --git a/ddl/column.go b/ddl/column.go index 04af3f1714a1d..25ce1f81b9557 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -811,7 +811,7 @@ func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.J if err != nil { return false, ver, errors.Trace(err) } - reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, dbInfo, tbl, BuildElements(changingCol, changingIdxs), false) + reorgInfo, err := getReorgInfo(d.jobContext(job.ID), d, rh, job, dbInfo, tbl, BuildElements(changingCol, changingIdxs), false) if err != nil || reorgInfo.first { // If we run reorg firstly, we should update the job snapshot version // and then run the reorg next time. @@ -1059,7 +1059,7 @@ func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error TestReorgGoroutineRunning <- a for { time.Sleep(30 * time.Millisecond) - if w.getReorgCtx(reorgInfo.Job).isReorgCanceled() { + if w.getReorgCtx(reorgInfo.Job.ID).isReorgCanceled() { // Job is cancelled. So it can't be done. failpoint.Return(dbterror.ErrCancelledDDLJob) } @@ -1081,7 +1081,7 @@ func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error return errors.Trace(err) } //nolint:forcetypeassert - originalStartHandle, originalEndHandle, err := getTableRange(reorgInfo.d.jobContext(reorgInfo.Job), reorgInfo.d, t.(table.PhysicalTable), currentVer.Ver, reorgInfo.Job.Priority) + originalStartHandle, originalEndHandle, err := getTableRange(reorgInfo.d.jobContext(reorgInfo.Job.ID), reorgInfo.d, t.(table.PhysicalTable), currentVer.Ver, reorgInfo.Job.Priority) if err != nil { return errors.Trace(err) } @@ -1104,11 +1104,11 @@ func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error // Then the handle range of the rest elements' is [originalStartHandle, originalEndHandle]. if i == startElementOffsetToResetHandle+1 { reorgInfo.StartKey, reorgInfo.EndKey = originalStartHandle, originalEndHandle - w.getReorgCtx(reorgInfo.Job).setNextKey(reorgInfo.StartKey) + w.getReorgCtx(reorgInfo.Job.ID).setNextKey(reorgInfo.StartKey) } // Update the element in the reorgCtx to keep the atomic access for daemon-worker. - w.getReorgCtx(reorgInfo.Job).setCurrentElement(reorgInfo.elements[i+1]) + w.getReorgCtx(reorgInfo.Job.ID).setCurrentElement(reorgInfo.elements[i+1]) // Update the element in the reorgInfo for updating the reorg meta below. reorgInfo.currElement = reorgInfo.elements[i+1] @@ -1132,7 +1132,7 @@ func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error } type updateColumnWorker struct { - *backfillWorker + *backfillCtx oldColInfo *model.ColumnInfo newColInfo *model.ColumnInfo metricCounter prometheus.Counter @@ -1144,11 +1144,10 @@ type updateColumnWorker struct { rowMap map[int64]types.Datum // For SQL Mode and warnings. - sqlMode mysql.SQLMode jobContext *JobContext } -func newUpdateColumnWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *updateColumnWorker { +func newUpdateColumnWorker(sessCtx sessionctx.Context, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *updateColumnWorker { if !bytes.Equal(reorgInfo.currElement.TypeKey, meta.ColumnElementKey) { logutil.BgLogger().Error("Element type for updateColumnWorker incorrect", zap.String("jobQuery", reorgInfo.Query), zap.String("reorgInfo", reorgInfo.String())) @@ -1164,14 +1163,13 @@ func newUpdateColumnWorker(sessCtx sessionctx.Context, id int, t table.PhysicalT } rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap) return &updateColumnWorker{ - backfillWorker: newBackfillWorker(jc.ddlJobCtx, sessCtx, id, t, reorgInfo, typeUpdateColumnWorker), - oldColInfo: oldCol, - newColInfo: newCol, - metricCounter: metrics.BackfillTotalCounter.WithLabelValues(metrics.GenerateReorgLabel("update_col_rate", reorgInfo.SchemaName, t.Meta().Name.String())), - rowDecoder: rowDecoder, - rowMap: make(map[int64]types.Datum, len(decodeColMap)), - sqlMode: reorgInfo.ReorgMeta.SQLMode, - jobContext: jc, + backfillCtx: newBackfillCtx(reorgInfo.d, sessCtx, reorgInfo.ReorgMeta.ReorgTp, reorgInfo.SchemaName, t), + oldColInfo: oldCol, + newColInfo: newCol, + metricCounter: metrics.BackfillTotalCounter.WithLabelValues(metrics.GenerateReorgLabel("update_col_rate", reorgInfo.SchemaName, t.Meta().Name.String())), + rowDecoder: rowDecoder, + rowMap: make(map[int64]types.Datum, len(decodeColMap)), + jobContext: jc, } } @@ -1179,6 +1177,26 @@ func (w *updateColumnWorker) AddMetricInfo(cnt float64) { w.metricCounter.Add(cnt) } +func (*updateColumnWorker) String() string { + return typeUpdateColumnWorker.String() +} + +func (*updateColumnWorker) GetTask() (*BackfillJob, error) { + panic("[ddl] update column worker GetTask function doesn't implement") +} + +func (*updateColumnWorker) UpdateTask(*BackfillJob) error { + panic("[ddl] update column worker UpdateTask function doesn't implement") +} + +func (*updateColumnWorker) FinishTask(*BackfillJob) error { + panic("[ddl] update column worker FinishTask function doesn't implement") +} + +func (w *updateColumnWorker) GetCtx() *backfillCtx { + return w.backfillCtx +} + type rowRecord struct { key []byte // It's used to lock a record. Record it to reduce the encoding time. vals []byte // It's the record. @@ -1204,8 +1222,8 @@ func (w *updateColumnWorker) fetchRowColVals(txn kv.Transaction, taskRange reorg taskDone := false var lastAccessedHandle kv.Key oprStartTime := startTime - err := iterateSnapshotKeys(w.reorgInfo.d.jobContext(w.reorgInfo.Job), w.sessCtx.GetStore(), w.priority, w.table.RecordPrefix(), txn.StartTS(), taskRange.startKey, taskRange.endKey, - func(handle kv.Handle, recordKey kv.Key, rawRow []byte) (bool, error) { + err := iterateSnapshotKeys(w.GetCtx().jobContext(taskRange.getJobID()), w.sessCtx.GetStore(), taskRange.priority, taskRange.physicalTable.RecordPrefix(), + txn.StartTS(), taskRange.startKey, taskRange.endKey, func(handle kv.Handle, recordKey kv.Key, rawRow []byte) (bool, error) { oprEndTime := time.Now() logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotKeys in updateColumnWorker fetchRowColVals", 0) oprStartTime = oprEndTime @@ -1346,8 +1364,8 @@ func (w *updateColumnWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error { taskCtx.addedCount = 0 taskCtx.scanCount = 0 - txn.SetOption(kv.Priority, w.priority) - if tagger := w.reorgInfo.d.getResourceGroupTaggerForTopSQL(w.reorgInfo.Job); tagger != nil { + txn.SetOption(kv.Priority, handleRange.priority) + if tagger := w.GetCtx().getResourceGroupTaggerForTopSQL(handleRange.getJobID()); tagger != nil { txn.SetOption(kv.ResourceGroupTagger, tagger) } diff --git a/ddl/ddl.go b/ddl/ddl.go index 4cbdcfde9eeef..224f07739355d 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -417,15 +417,15 @@ func (dc *ddlCtx) isOwner() bool { return isOwner } -func (dc *ddlCtx) setDDLLabelForTopSQL(job *model.Job) { +func (dc *ddlCtx) setDDLLabelForTopSQL(jobID int64, jobQuery string) { dc.jobCtx.Lock() defer dc.jobCtx.Unlock() - ctx, exists := dc.jobCtx.jobCtxMap[job.ID] + ctx, exists := dc.jobCtx.jobCtxMap[jobID] if !exists { ctx = NewJobContext() - dc.jobCtx.jobCtxMap[job.ID] = ctx + dc.jobCtx.jobCtxMap[jobID] = ctx } - ctx.setDDLLabelForTopSQL(job) + ctx.setDDLLabelForTopSQL(jobQuery) } func (dc *ddlCtx) setDDLSourceForDiagnosis(job *model.Job) { @@ -439,10 +439,10 @@ func (dc *ddlCtx) setDDLSourceForDiagnosis(job *model.Job) { ctx.setDDLLabelForDiagnosis(job) } -func (dc *ddlCtx) getResourceGroupTaggerForTopSQL(job *model.Job) tikvrpc.ResourceGroupTagger { +func (dc *ddlCtx) getResourceGroupTaggerForTopSQL(jobID int64) tikvrpc.ResourceGroupTagger { dc.jobCtx.Lock() defer dc.jobCtx.Unlock() - ctx, exists := dc.jobCtx.jobCtxMap[job.ID] + ctx, exists := dc.jobCtx.jobCtxMap[jobID] if !exists { return nil } @@ -455,19 +455,19 @@ func (dc *ddlCtx) removeJobCtx(job *model.Job) { delete(dc.jobCtx.jobCtxMap, job.ID) } -func (dc *ddlCtx) jobContext(job *model.Job) *JobContext { +func (dc *ddlCtx) jobContext(jobID int64) *JobContext { dc.jobCtx.RLock() defer dc.jobCtx.RUnlock() - if jobContext, exists := dc.jobCtx.jobCtxMap[job.ID]; exists { + if jobContext, exists := dc.jobCtx.jobCtxMap[jobID]; exists { return jobContext } return NewJobContext() } -func (dc *ddlCtx) getReorgCtx(job *model.Job) *reorgCtx { +func (dc *ddlCtx) getReorgCtx(jobID int64) *reorgCtx { dc.reorgCtx.RLock() defer dc.reorgCtx.RUnlock() - return dc.reorgCtx.reorgCtxMap[job.ID] + return dc.reorgCtx.reorgCtxMap[jobID] } func (dc *ddlCtx) newReorgCtx(r *reorgInfo) *reorgCtx { @@ -492,7 +492,7 @@ func (dc *ddlCtx) removeReorgCtx(job *model.Job) { } func (dc *ddlCtx) notifyReorgCancel(job *model.Job) { - rc := dc.getReorgCtx(job) + rc := dc.getReorgCtx(job.ID) if rc == nil { return } diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 7843fac34a69e..e6fdafd2f62fb 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -641,14 +641,14 @@ func isDependencyJobDone(t *meta.Meta, job *model.Job) (bool, error) { return true, nil } -func (w *JobContext) setDDLLabelForTopSQL(job *model.Job) { - if !topsqlstate.TopSQLEnabled() || job == nil { +func (w *JobContext) setDDLLabelForTopSQL(jobQuery string) { + if !topsqlstate.TopSQLEnabled() || jobQuery == "" { return } - if job.Query != w.cacheSQL || w.cacheDigest == nil { - w.cacheNormalizedSQL, w.cacheDigest = parser.NormalizeDigest(job.Query) - w.cacheSQL = job.Query + if jobQuery != w.cacheSQL || w.cacheDigest == nil { + w.cacheNormalizedSQL, w.cacheDigest = parser.NormalizeDigest(jobQuery) + w.cacheSQL = jobQuery w.ddlJobCtx = topsql.AttachAndRegisterSQLInfo(context.Background(), w.cacheNormalizedSQL, w.cacheDigest, false) } else { topsql.AttachAndRegisterSQLInfo(w.ddlJobCtx, w.cacheNormalizedSQL, w.cacheDigest, false) @@ -735,10 +735,10 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) { if w.tp == addIdxWorker && job.IsRunning() { txn.SetDiskFullOpt(kvrpcpb.DiskFullOpt_NotAllowedOnFull) } - w.setDDLLabelForTopSQL(job) + w.setDDLLabelForTopSQL(job.ID, job.Query) w.setDDLSourceForDiagnosis(job) - jobContext := w.jobContext(job) - if tagger := w.getResourceGroupTaggerForTopSQL(job); tagger != nil { + jobContext := w.jobContext(job.ID) + if tagger := w.getResourceGroupTaggerForTopSQL(job.ID); tagger != nil { txn.SetOption(kv.ResourceGroupTagger, tagger) } t := meta.NewMeta(txn) diff --git a/ddl/ddl_workerpool_test.go b/ddl/ddl_workerpool_test.go index d8768507b8102..123d05abb1d86 100644 --- a/ddl/ddl_workerpool_test.go +++ b/ddl/ddl_workerpool_test.go @@ -19,7 +19,6 @@ import ( "testing" "github.com/ngaut/pools" - "github.com/pingcap/tidb/parser/model" "github.com/stretchr/testify/require" ) @@ -36,10 +35,9 @@ func TestDDLWorkerPool(t *testing.T) { } func TestBackfillWorkerPool(t *testing.T) { - reorgInfo := &reorgInfo{Job: &model.Job{ID: 1}} f := func() func() (pools.Resource, error) { return func() (pools.Resource, error) { - wk := newBackfillWorker(context.Background(), nil, 1, nil, reorgInfo, typeAddIndexWorker) + wk := newBackfillWorker(context.Background(), 1, nil) return wk, nil } } diff --git a/ddl/index.go b/ddl/index.go index 9690f13a2e6a0..f4e5ca8381ace 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -39,6 +39,7 @@ import ( "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" @@ -886,7 +887,7 @@ func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, if err != nil { return false, ver, errors.Trace(err) } - reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, dbInfo, tbl, elements, mergingTmpIdx) + reorgInfo, err := getReorgInfo(d.jobContext(job.ID), d, rh, job, dbInfo, tbl, elements, mergingTmpIdx) if err != nil || reorgInfo.first { // If we run reorg firstly, we should update the job snapshot version // and then run the reorg next time. @@ -1191,9 +1192,10 @@ type indexRecord struct { } type baseIndexWorker struct { - *backfillWorker + *backfillCtx indexes []table.Index + tp backfillerType metricCounter prometheus.Counter // The following attributes are used to reduce memory allocation. @@ -1202,7 +1204,6 @@ type baseIndexWorker struct { rowMap map[int64]types.Datum rowDecoder *decoder.RowDecoder - sqlMode mysql.SQLMode jobContext *JobContext } @@ -1218,24 +1219,23 @@ type addIndexWorker struct { distinctCheckFlags []bool } -func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, - reorgInfo *reorgInfo, jc *JobContext, job *model.Job) (*addIndexWorker, error) { - if !bytes.Equal(reorgInfo.currElement.TypeKey, meta.IndexElementKey) { - logutil.BgLogger().Error("Element type for addIndexWorker incorrect", zap.String("jobQuery", reorgInfo.Query), - zap.String("reorgInfo", reorgInfo.String())) - return nil, errors.Errorf("element type is not index, typeKey: %v", reorgInfo.currElement.TypeKey) +func newAddIndexWorker(decodeColMap map[int64]decoder.Column, id int, t table.PhysicalTable, bfCtx *backfillCtx, jc *JobContext, jobID, eleID int64, eleTypeKey []byte) (*addIndexWorker, error) { + if !bytes.Equal(eleTypeKey, meta.IndexElementKey) { + logutil.BgLogger().Error("Element type for addIndexWorker incorrect", zap.String("jobQuery", jc.cacheSQL), + zap.Int64("job ID", jobID), zap.ByteString("element type", eleTypeKey), zap.Int64("element ID", eleID)) + return nil, errors.Errorf("element type is not index, typeKey: %v", eleTypeKey) } - indexInfo := model.FindIndexInfoByID(t.Meta().Indices, reorgInfo.currElement.ID) + indexInfo := model.FindIndexInfoByID(t.Meta().Indices, eleID) index := tables.NewIndex(t.GetPhysicalID(), t.Meta(), indexInfo) rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap) var lwCtx *ingest.WriterContext - if job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge { - bc, ok := ingest.LitBackCtxMgr.Load(job.ID) + if bfCtx.reorgTp == model.ReorgTypeLitMerge { + bc, ok := ingest.LitBackCtxMgr.Load(jobID) if !ok { return nil, errors.Trace(errors.New(ingest.LitErrGetBackendFail)) } - ei, err := bc.EngMgr.Register(bc, job, reorgInfo.currElement.ID) + ei, err := bc.EngMgr.Register(bc, jobID, eleID, bfCtx.schemaName, t.Meta().Name.O) if err != nil { return nil, errors.Trace(err) } @@ -1247,14 +1247,13 @@ func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable return &addIndexWorker{ baseIndexWorker: baseIndexWorker{ - backfillWorker: newBackfillWorker(jc.ddlJobCtx, sessCtx, id, t, reorgInfo, typeAddIndexWorker), - indexes: []table.Index{index}, - rowDecoder: rowDecoder, - defaultVals: make([]types.Datum, len(t.WritableCols())), - rowMap: make(map[int64]types.Datum, len(decodeColMap)), - metricCounter: metrics.BackfillTotalCounter.WithLabelValues(metrics.GenerateReorgLabel("add_idx_rate", reorgInfo.SchemaName, t.Meta().Name.String())), - sqlMode: reorgInfo.ReorgMeta.SQLMode, - jobContext: jc, + backfillCtx: bfCtx, + indexes: []table.Index{index}, + rowDecoder: rowDecoder, + defaultVals: make([]types.Datum, len(t.WritableCols())), + rowMap: make(map[int64]types.Datum, len(decodeColMap)), + metricCounter: metrics.BackfillTotalCounter.WithLabelValues(metrics.GenerateReorgLabel("add_idx_rate", bfCtx.schemaName, t.Meta().Name.String())), + jobContext: jc, }, index: index, writerCtx: lwCtx, @@ -1265,6 +1264,65 @@ func (w *baseIndexWorker) AddMetricInfo(cnt float64) { w.metricCounter.Add(cnt) } +func (*baseIndexWorker) GetTask() (*BackfillJob, error) { + return nil, nil +} + +func (w *baseIndexWorker) String() string { + return w.tp.String() +} + +func (w *baseIndexWorker) UpdateTask(bfJob *BackfillJob) error { + sess, ok := w.backfillCtx.sessCtx.(*session) + if !ok { + return errors.Errorf("sess ctx:%#v convert session failed", w.backfillCtx.sessCtx) + } + + return runInTxn(sess, func(se *session) error { + jobs, err := GetBackfillJobs(sess, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s' and id = %d", + bfJob.JobID, bfJob.EleID, bfJob.EleKey, bfJob.ID), "update_backfill_task") + if err != nil { + return err + } + if len(jobs) == 0 { + return dbterror.ErrDDLJobNotFound.FastGen("get zero backfill job") + } + if jobs[0].InstanceID != bfJob.InstanceID { + return dbterror.ErrDDLJobNotFound.FastGenByArgs(fmt.Sprintf("get a backfill job %v, want instance ID %s", jobs[0], bfJob.InstanceID)) + } + + currTime, err := GetOracleTimeWithStartTS(se) + if err != nil { + return err + } + bfJob.InstanceLease = GetLeaseGoTime(currTime, InstanceLease) + return updateBackfillJob(sess, BackfillTable, bfJob, "update_backfill_task") + }) +} + +func (w *baseIndexWorker) FinishTask(bfJob *BackfillJob) error { + sess, ok := w.backfillCtx.sessCtx.(*session) + if !ok { + return errors.Errorf("sess ctx:%#v convert session failed", w.backfillCtx.sessCtx) + } + return runInTxn(sess, func(se *session) error { + txn, err := se.txn() + if err != nil { + return errors.Trace(err) + } + bfJob.FinishTS = txn.StartTS() + err = RemoveBackfillJob(sess, false, bfJob) + if err != nil { + return err + } + return AddBackfillHistoryJob(sess, []*BackfillJob{bfJob}) + }) +} + +func (w *baseIndexWorker) GetCtx() *backfillCtx { + return w.backfillCtx +} + // mockNotOwnerErrOnce uses to make sure `notOwnerErr` only mock error once. var mockNotOwnerErrOnce uint32 @@ -1355,8 +1413,9 @@ func (w *baseIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgBac // taskDone means that the reorged handle is out of taskRange.endHandle. taskDone := false oprStartTime := startTime - err := iterateSnapshotKeys(w.reorgInfo.d.jobContext(w.reorgInfo.Job), w.sessCtx.GetStore(), w.priority, w.table.RecordPrefix(), txn.StartTS(), taskRange.startKey, taskRange.endKey, - func(handle kv.Handle, recordKey kv.Key, rawRow []byte) (bool, error) { + jobID := taskRange.getJobID() + err := iterateSnapshotKeys(w.GetCtx().jobContext(jobID), w.sessCtx.GetStore(), taskRange.priority, taskRange.physicalTable.RecordPrefix(), txn.StartTS(), + taskRange.startKey, taskRange.endKey, func(handle kv.Handle, recordKey kv.Key, rawRow []byte) (bool, error) { oprEndTime := time.Now() logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotKeys in baseIndexWorker fetchRowColVals", 0) oprStartTime = oprEndTime @@ -1516,12 +1575,13 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC needMergeTmpIdx := w.index.Meta().BackfillState != model.BackfillStateInapplicable oprStartTime := time.Now() + jobID := handleRange.getJobID() ctx := kv.WithInternalSourceType(context.Background(), w.jobContext.ddlJobSourceType()) errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) (err error) { taskCtx.addedCount = 0 taskCtx.scanCount = 0 - txn.SetOption(kv.Priority, w.priority) - if tagger := w.reorgInfo.d.getResourceGroupTaggerForTopSQL(w.reorgInfo.Job); tagger != nil { + txn.SetOption(kv.Priority, handleRange.priority) + if tagger := w.GetCtx().getResourceGroupTaggerForTopSQL(jobID); tagger != nil { txn.SetOption(kv.ResourceGroupTagger, tagger) } @@ -1635,7 +1695,18 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error { } } else { //nolint:forcetypeassert - err = w.addPhysicalTableIndex(t.(table.PhysicalTable), reorgInfo) + phyTbl := t.(table.PhysicalTable) + // TODO: Support typeAddIndexMergeTmpWorker and partitionTable. + isDistReorg := variable.DDLEnableDistributeReorg.Load() + if isDistReorg && !reorgInfo.mergingTmpIdx { + sCtx, err := w.sessPool.get() + if err != nil { + return errors.Trace(err) + } + defer w.sessPool.put(sCtx) + return w.controlWritePhysicalTableRecord(newSession(sCtx), phyTbl, typeAddIndexWorker, reorgInfo) + } + err = w.addPhysicalTableIndex(phyTbl, reorgInfo) } return errors.Trace(err) } @@ -1678,7 +1749,7 @@ func (w *worker) updateReorgInfo(t table.PartitionedTable, reorg *reorgInfo) (bo if err != nil { return false, errors.Trace(err) } - start, end, err := getTableRange(reorg.d.jobContext(reorg.Job), reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority) + start, end, err := getTableRange(reorg.d.jobContext(reorg.Job.ID), reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority) if err != nil { return false, errors.Trace(err) } @@ -1749,7 +1820,7 @@ type cleanUpIndexWorker struct { baseIndexWorker } -func newCleanUpIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *cleanUpIndexWorker { +func newCleanUpIndexWorker(sessCtx sessionctx.Context, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *cleanUpIndexWorker { indexes := make([]table.Index, 0, len(t.Indices())) rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap) for _, index := range t.Indices() { @@ -1759,14 +1830,13 @@ func newCleanUpIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalT } return &cleanUpIndexWorker{ baseIndexWorker: baseIndexWorker{ - backfillWorker: newBackfillWorker(jc.ddlJobCtx, sessCtx, id, t, reorgInfo, typeCleanUpIndexWorker), - indexes: indexes, - rowDecoder: rowDecoder, - defaultVals: make([]types.Datum, len(t.WritableCols())), - rowMap: make(map[int64]types.Datum, len(decodeColMap)), - metricCounter: metrics.BackfillTotalCounter.WithLabelValues(metrics.GenerateReorgLabel("cleanup_idx_rate", reorgInfo.SchemaName, t.Meta().Name.String())), - sqlMode: reorgInfo.ReorgMeta.SQLMode, - jobContext: jc, + backfillCtx: newBackfillCtx(reorgInfo.d, sessCtx, reorgInfo.ReorgMeta.ReorgTp, reorgInfo.SchemaName, t), + indexes: indexes, + rowDecoder: rowDecoder, + defaultVals: make([]types.Datum, len(t.WritableCols())), + rowMap: make(map[int64]types.Datum, len(decodeColMap)), + metricCounter: metrics.BackfillTotalCounter.WithLabelValues(metrics.GenerateReorgLabel("cleanup_idx_rate", reorgInfo.SchemaName, t.Meta().Name.String())), + jobContext: jc, }, } } @@ -1784,8 +1854,8 @@ func (w *cleanUpIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error { taskCtx.addedCount = 0 taskCtx.scanCount = 0 - txn.SetOption(kv.Priority, w.priority) - if tagger := w.reorgInfo.d.getResourceGroupTaggerForTopSQL(w.reorgInfo.Job); tagger != nil { + txn.SetOption(kv.Priority, handleRange.priority) + if tagger := w.GetCtx().getResourceGroupTaggerForTopSQL(handleRange.getJobID()); tagger != nil { txn.SetOption(kv.ResourceGroupTagger, tagger) } @@ -1868,7 +1938,7 @@ func (w *worker) updateReorgInfoForPartitions(t table.PartitionedTable, reorg *r if err != nil { return false, errors.Trace(err) } - start, end, err := getTableRange(reorg.d.jobContext(reorg.Job), reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority) + start, end, err := getTableRange(reorg.d.jobContext(reorg.Job.ID), reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority) if err != nil { return false, errors.Trace(err) } @@ -1877,7 +1947,7 @@ func (w *worker) updateReorgInfoForPartitions(t table.PartitionedTable, reorg *r // Write the reorg info to store so the whole reorganize process can recover from panic. err = reorg.UpdateReorgMeta(reorg.StartKey, w.sessPool) logutil.BgLogger().Info("[ddl] job update reorg info", zap.Int64("jobID", reorg.Job.ID), - zap.ByteString("element type", reorg.currElement.TypeKey), zap.Int64("element ID", reorg.currElement.ID), + zap.Stringer("element", reorg.currElement), zap.Int64("partition table ID", pid), zap.String("start key", hex.EncodeToString(start)), zap.String("end key", hex.EncodeToString(end)), zap.Error(err)) return false, errors.Trace(err) diff --git a/ddl/index_merge_tmp.go b/ddl/index_merge_tmp.go index 5f699b3507e6f..737ed84d33872 100644 --- a/ddl/index_merge_tmp.go +++ b/ddl/index_merge_tmp.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" @@ -80,7 +79,7 @@ type temporaryIndexRecord struct { } type mergeIndexWorker struct { - *backfillWorker + *backfillCtx index table.Index @@ -90,15 +89,15 @@ type mergeIndexWorker struct { jobContext *JobContext } -func newMergeTempIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable, reorgInfo *reorgInfo, jc *JobContext) *mergeIndexWorker { - indexInfo := model.FindIndexInfoByID(t.Meta().Indices, reorgInfo.currElement.ID) +func newMergeTempIndexWorker(bfCtx *backfillCtx, id int, t table.PhysicalTable, eleID int64, jc *JobContext) *mergeIndexWorker { + indexInfo := model.FindIndexInfoByID(t.Meta().Indices, eleID) index := tables.NewIndex(t.GetPhysicalID(), t.Meta(), indexInfo) return &mergeIndexWorker{ - backfillWorker: newBackfillWorker(jc.ddlJobCtx, sessCtx, id, t, reorgInfo, typeAddIndexMergeTmpWorker), - index: index, - jobContext: jc, + backfillCtx: bfCtx, + index: index, + jobContext: jc, } } @@ -109,8 +108,8 @@ func (w *mergeIndexWorker) BackfillDataInTxn(taskRange reorgBackfillTask) (taskC errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error { taskCtx.addedCount = 0 taskCtx.scanCount = 0 - txn.SetOption(kv.Priority, w.priority) - if tagger := w.reorgInfo.d.getResourceGroupTaggerForTopSQL(w.reorgInfo.Job); tagger != nil { + txn.SetOption(kv.Priority, taskRange.priority) + if tagger := w.GetCtx().getResourceGroupTaggerForTopSQL(taskRange.getJobID()); tagger != nil { txn.SetOption(kv.ResourceGroupTagger, tagger) } @@ -163,7 +162,27 @@ func (w *mergeIndexWorker) BackfillDataInTxn(taskRange reorgBackfillTask) (taskC return } -func (w *mergeIndexWorker) AddMetricInfo(cnt float64) { +func (*mergeIndexWorker) AddMetricInfo(float64) { +} + +func (*mergeIndexWorker) String() string { + return typeAddIndexMergeTmpWorker.String() +} + +func (*mergeIndexWorker) GetTask() (*BackfillJob, error) { + panic("[ddl] merge index worker GetTask function doesn't implement") +} + +func (*mergeIndexWorker) UpdateTask(*BackfillJob) error { + panic("[ddl] merge index worker UpdateTask function doesn't implement") +} + +func (*mergeIndexWorker) FinishTask(*BackfillJob) error { + panic("[ddl] merge index worker FinishTask function doesn't implement") +} + +func (w *mergeIndexWorker) GetCtx() *backfillCtx { + return w.backfillCtx } func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reorgBackfillTask) ([]*temporaryIndexRecord, kv.Key, bool, error) { @@ -177,7 +196,7 @@ func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reor idxPrefix := w.table.IndexPrefix() var lastKey kv.Key isCommonHandle := w.table.Meta().IsCommonHandle - err := iterateSnapshotKeys(w.reorgInfo.d.jobContext(w.reorgInfo.Job), w.sessCtx.GetStore(), w.priority, idxPrefix, txn.StartTS(), + err := iterateSnapshotKeys(w.GetCtx().jobContext(taskRange.getJobID()), w.sessCtx.GetStore(), taskRange.priority, idxPrefix, txn.StartTS(), taskRange.startKey, taskRange.endKey, func(_ kv.Handle, indexKey kv.Key, rawValue []byte) (more bool, err error) { oprEndTime := time.Now() logSlowOperations(oprEndTime.Sub(oprStartTime), "iterate temporary index in merge process", 0) diff --git a/ddl/ingest/engine_mgr.go b/ddl/ingest/engine_mgr.go index 565d0b30d1ab8..f9b006ec9e369 100644 --- a/ddl/ingest/engine_mgr.go +++ b/ddl/ingest/engine_mgr.go @@ -18,7 +18,6 @@ import ( "fmt" "github.com/pingcap/errors" - "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/generic" "github.com/pingcap/tidb/util/logutil" @@ -38,7 +37,7 @@ func (m *engineManager) init(memRoot MemRoot, diskRoot DiskRoot) { } // Register create a new engineInfo and register it to the engineManager. -func (m *engineManager) Register(bc *BackendContext, job *model.Job, indexID int64) (*engineInfo, error) { +func (m *engineManager) Register(bc *BackendContext, jobID, indexID int64, schemaName, tableName string) (*engineInfo, error) { // Calculate lightning concurrency degree and set memory usage // and pre-allocate memory usage for worker. m.MemRoot.RefreshConsumption() @@ -56,22 +55,22 @@ func (m *engineManager) Register(bc *BackendContext, job *model.Job, indexID int return nil, genEngineAllocMemFailedErr(m.MemRoot, bc.jobID, indexID) } - cfg := generateLocalEngineConfig(job.ID, job.SchemaName, job.TableName) - openedEn, err := bc.backend.OpenEngine(bc.ctx, cfg, job.TableName, int32(indexID)) + cfg := generateLocalEngineConfig(jobID, schemaName, tableName) + openedEn, err := bc.backend.OpenEngine(bc.ctx, cfg, tableName, int32(indexID)) if err != nil { - logutil.BgLogger().Warn(LitErrCreateEngineFail, zap.Int64("job ID", job.ID), + logutil.BgLogger().Warn(LitErrCreateEngineFail, zap.Int64("job ID", jobID), zap.Int64("index ID", indexID), zap.Error(err)) return nil, errors.Trace(err) } id := openedEn.GetEngineUUID() - en = NewEngineInfo(bc.ctx, job.ID, indexID, cfg, openedEn, id, 1, m.MemRoot, m.DiskRoot) + en = NewEngineInfo(bc.ctx, jobID, indexID, cfg, openedEn, id, 1, m.MemRoot, m.DiskRoot) m.Store(indexID, en) m.MemRoot.Consume(StructSizeEngineInfo) - m.MemRoot.ConsumeWithTag(encodeEngineTag(job.ID, indexID), engineCacheSize) + m.MemRoot.ConsumeWithTag(encodeEngineTag(jobID, indexID), engineCacheSize) info = LitInfoOpenEngine } else { if en.writerCount+1 > bc.cfg.TikvImporter.RangeConcurrency { - logutil.BgLogger().Warn(LitErrExceedConcurrency, zap.Int64("job ID", job.ID), + logutil.BgLogger().Warn(LitErrExceedConcurrency, zap.Int64("job ID", jobID), zap.Int64("index ID", indexID), zap.Int("concurrency", bc.cfg.TikvImporter.RangeConcurrency)) return nil, dbterror.ErrIngestFailed.FastGenByArgs("concurrency quota exceeded") @@ -79,8 +78,8 @@ func (m *engineManager) Register(bc *BackendContext, job *model.Job, indexID int en.writerCount++ info = LitInfoAddWriter } - m.MemRoot.ConsumeWithTag(encodeEngineTag(job.ID, indexID), int64(bc.cfg.TikvImporter.LocalWriterMemCacheSize)) - logutil.BgLogger().Info(info, zap.Int64("job ID", job.ID), + m.MemRoot.ConsumeWithTag(encodeEngineTag(jobID, indexID), int64(bc.cfg.TikvImporter.LocalWriterMemCacheSize)) + logutil.BgLogger().Info(info, zap.Int64("job ID", jobID), zap.Int64("index ID", indexID), zap.Int64("current memory usage", m.MemRoot.CurrentUsage()), zap.Int64("memory limitation", m.MemRoot.MaxMemoryQuota()), diff --git a/ddl/job_table.go b/ddl/job_table.go index 06f745506145a..740bb5c0b7da1 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -496,51 +496,56 @@ func getJobsBySQL(sess *session, tbl, condition string) ([]*model.Job, error) { return jobs, nil } -// AddBackfillJobs adds the backfill jobs to the tidb_ddl_backfill table. -func AddBackfillJobs(sess *session, backfillJobs []*BackfillJob) error { - return addBackfillJobs(sess, BackfillTable, backfillJobs) +func generateInsertBackfillJobSQL(tableName string, backfillJobs []*BackfillJob) (string, error) { + sqlPrefix := fmt.Sprintf("insert into mysql.%s(id, ddl_job_id, ele_id, ele_key, store_id, type, exec_id, exec_lease, state, curr_key, start_key, end_key, start_ts, finish_ts, row_count, backfill_meta) values", tableName) + var sql string + for i, bj := range backfillJobs { + mateByte, err := bj.Meta.Encode() + if err != nil { + return "", errors.Trace(err) + } + + if i == 0 { + sql = sqlPrefix + fmt.Sprintf("(%d, %d, %d, '%s', %d, %d, '%s', '%s', %d, '%s', '%s', '%s', %d, %d, %d, '%s')", + bj.ID, bj.JobID, bj.EleID, bj.EleKey, bj.StoreID, bj.Tp, bj.InstanceID, bj.InstanceLease, bj.State, + bj.CurrKey, bj.StartKey, bj.EndKey, bj.StartTS, bj.FinishTS, bj.RowCount, mateByte) + continue + } + sql += fmt.Sprintf(", (%d, %d, %d, '%s', %d, %d, '%s', '%s', %d, '%s', '%s', '%s', %d, %d, %d, '%s')", + bj.ID, bj.JobID, bj.EleID, bj.EleKey, bj.StoreID, bj.Tp, bj.InstanceID, bj.InstanceLease, bj.State, + bj.CurrKey, bj.StartKey, bj.EndKey, bj.StartTS, bj.FinishTS, bj.RowCount, mateByte) + } + return sql, nil } // AddBackfillHistoryJob adds the backfill jobs to the tidb_ddl_backfill_history table. func AddBackfillHistoryJob(sess *session, backfillJobs []*BackfillJob) error { - return addBackfillJobs(sess, BackfillHistoryTable, backfillJobs) + label := fmt.Sprintf("add_%s_job", BackfillHistoryTable) + sql, err := generateInsertBackfillJobSQL(BackfillHistoryTable, backfillJobs) + if err != nil { + return err + } + _, err = sess.execute(context.Background(), sql, label) + return errors.Trace(err) } -// addBackfillJobs adds the backfill jobs to the tidb_ddl_backfill table. -func addBackfillJobs(sess *session, tableName string, backfillJobs []*BackfillJob) error { - sqlPrefix := fmt.Sprintf( - "insert into mysql.%s(id, ddl_job_id, ele_id, ele_key, store_id, type, exec_id, exec_lease, state, curr_key, start_key, end_key, start_ts, finish_ts, row_count, backfill_meta) values", tableName) - var sql string - label := fmt.Sprintf("add_%s_job", tableName) +// AddBackfillJobs adds the backfill jobs to the tidb_ddl_backfill table. +func AddBackfillJobs(sess *session, backfillJobs []*BackfillJob) error { + label := fmt.Sprintf("add_%s_job", BackfillTable) // Do runInTxn to get StartTS. return runInTxn(newSession(sess), func(se *session) error { txn, err := se.txn() if err != nil { return errors.Trace(err) } - startTS := txn.StartTS() - for i, bj := range backfillJobs { - if tableName == BackfillTable { - bj.StartTS = startTS - } - if tableName == BackfillHistoryTable { - bj.FinishTS = startTS - } - mateByte, err := bj.Meta.Encode() - if err != nil { - return errors.Trace(err) - } + for _, bj := range backfillJobs { + bj.StartTS = startTS + } - if i == 0 { - sql = sqlPrefix + fmt.Sprintf("(%d, %d, %d, '%s', %d, %d, '%s', '%s', %d, '%s', '%s', '%s', %d, %d, %d, '%s')", - bj.ID, bj.JobID, bj.EleID, bj.EleKey, bj.StoreID, bj.Tp, bj.InstanceID, bj.InstanceLease, bj.State, - bj.CurrKey, bj.StartKey, bj.EndKey, bj.StartTS, bj.FinishTS, bj.RowCount, mateByte) - continue - } - sql += fmt.Sprintf(", (%d, %d, %d, '%s', %d, %d, '%s', '%s', %d, '%s', '%s', '%s', %d, %d, %d, '%s')", - bj.ID, bj.JobID, bj.EleID, bj.EleKey, bj.StoreID, bj.Tp, bj.InstanceID, bj.InstanceLease, bj.State, - bj.CurrKey, bj.StartKey, bj.EndKey, bj.StartTS, bj.FinishTS, bj.RowCount, mateByte) + sql, err := generateInsertBackfillJobSQL(BackfillTable, backfillJobs) + if err != nil { + return err } _, err = sess.execute(context.Background(), sql, label) return errors.Trace(err) @@ -579,7 +584,7 @@ func GetBackfillJobsForOneEle(sess *session, batch int, excludedJobIDs []int64, var bJobs []*BackfillJob s := newSession(sess) err = runInTxn(s, func(se *session) error { - currTime, err := GetOracleTime(s) + currTime, err := GetOracleTimeWithStartTS(s) if err != nil { return err } @@ -612,7 +617,7 @@ func GetAndMarkBackfillJobsForOneEle(sess *session, batch int, jobID int64, uuid var bJobs []*BackfillJob s := newSession(sess) err := runInTxn(s, func(se *session) error { - currTime, err := GetOracleTime(se) + currTime, err := GetOracleTimeWithStartTS(se) if err != nil { return err } @@ -624,7 +629,7 @@ func GetAndMarkBackfillJobsForOneEle(sess *session, batch int, jobID int64, uuid return err } if len(bJobs) == 0 { - return dbterror.ErrDDLJobNotFound.FastGen("get zero backfill job, lease is timeout") + return dbterror.ErrDDLJobNotFound.FastGen("get zero backfill job") } validLen = 0 @@ -674,6 +679,21 @@ func GetBackfillJobCount(sess *session, tblName, condition string, label string) return int(rows[0].GetInt64(0)), nil } +func getUnsyncedInstanceIDs(sess *session, jobID int64, label string) ([]string, error) { + sql := fmt.Sprintf("select sum(state = %d) as tmp, exec_id from mysql.tidb_ddl_backfill_history where ddl_job_id = %d group by exec_id having tmp = 0;", + model.JobStateSynced, jobID) + rows, err := sess.execute(context.Background(), sql, label) + if err != nil { + return nil, errors.Trace(err) + } + InstanceIDs := make([]string, 0, len(rows)) + for _, row := range rows { + InstanceID := row.GetString(1) + InstanceIDs = append(InstanceIDs, InstanceID) + } + return InstanceIDs, nil +} + // GetBackfillJobs gets the backfill jobs in the tblName table according to condition. func GetBackfillJobs(sess *session, tblName, condition string, label string) ([]*BackfillJob, error) { rows, err := sess.execute(context.Background(), fmt.Sprintf("select * from mysql.%s where %s", tblName, condition), label) @@ -682,7 +702,7 @@ func GetBackfillJobs(sess *session, tblName, condition string, label string) ([] } bJobs := make([]*BackfillJob, 0, len(rows)) for _, row := range rows { - bJob := BackfillJob{ + bfJob := BackfillJob{ ID: row.GetInt64(0), JobID: row.GetInt64(1), EleID: row.GetInt64(2), @@ -699,12 +719,12 @@ func GetBackfillJobs(sess *session, tblName, condition string, label string) ([] FinishTS: row.GetUint64(13), RowCount: row.GetInt64(14), } - bJob.Meta = &model.BackfillMeta{} - err = bJob.Meta.Decode(row.GetBytes(15)) + bfJob.Meta = &model.BackfillMeta{} + err = bfJob.Meta.Decode(row.GetBytes(15)) if err != nil { return nil, errors.Trace(err) } - bJobs = append(bJobs, &bJob) + bJobs = append(bJobs, &bfJob) } return bJobs, nil } diff --git a/ddl/job_table_test.go b/ddl/job_table_test.go index 8948796e73243..d869dcecc2c0e 100644 --- a/ddl/job_table_test.go +++ b/ddl/job_table_test.go @@ -244,18 +244,20 @@ func TestSimpleExecBackfillJobs(t *testing.T) { d := dom.DDL() se := ddl.NewSession(tk.Session()) - jobID1 := int64(2) - jobID2 := int64(3) - eleID1 := int64(4) - eleID2 := int64(5) + jobID1 := int64(1) + jobID2 := int64(2) + eleID1 := int64(11) + eleID2 := int64(22) + eleID3 := int64(33) uuid := d.GetID() + eleKey := meta.IndexElementKey instanceLease := ddl.InstanceLease // test no backfill job bJobs, err := ddl.GetBackfillJobsForOneEle(se, 1, []int64{jobID1, jobID2}, instanceLease) require.NoError(t, err) require.Nil(t, bJobs) bJobs, err = ddl.GetAndMarkBackfillJobsForOneEle(se, 1, jobID1, uuid, instanceLease) - require.EqualError(t, err, dbterror.ErrDDLJobNotFound.FastGen("get zero backfill job, lease is timeout").Error()) + require.EqualError(t, err, dbterror.ErrDDLJobNotFound.FastGen("get zero backfill job").Error()) require.Nil(t, bJobs) allCnt, err := ddl.GetBackfillJobCount(se, ddl.BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s'", jobID1, eleID2, meta.IndexElementKey), "check_backfill_job_count") @@ -263,10 +265,10 @@ func TestSimpleExecBackfillJobs(t *testing.T) { require.Equal(t, allCnt, 0) // Test some backfill jobs, add backfill jobs to the table. cnt := 2 - bjTestCases := make([]*ddl.BackfillJob, 0, cnt*2) + bjTestCases := make([]*ddl.BackfillJob, 0, cnt*3) bJobs1 := makeAddIdxBackfillJobs(1, 2, jobID1, eleID1, cnt, "alter table add index idx(a)") bJobs2 := makeAddIdxBackfillJobs(1, 2, jobID2, eleID2, cnt, "alter table add index idx(b)") - bJobs3 := makeAddIdxBackfillJobs(1, 2, jobID2, eleID1, cnt, "alter table add index idx(c)") + bJobs3 := makeAddIdxBackfillJobs(1, 2, jobID2, eleID3, cnt, "alter table add index idx(c)") bjTestCases = append(bjTestCases, bJobs1...) bjTestCases = append(bjTestCases, bJobs2...) bjTestCases = append(bjTestCases, bJobs3...) @@ -277,37 +279,37 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // 1 jobID1 eleID1 "" // 0 jobID2 eleID2 "" // 1 jobID2 eleID2 "" - // 0 jobID2 eleID1 "" - // 1 jobID2 eleID1 "" + // 0 jobID2 eleID3 "" + // 1 jobID2 eleID3 "" require.NoError(t, err) // test get some backfill jobs bJobs, err = ddl.GetBackfillJobsForOneEle(se, 1, []int64{jobID2 - 1, jobID2 + 1}, instanceLease) require.NoError(t, err) require.Len(t, bJobs, 1) - expectJob := bjTestCases[4] + expectJob := bjTestCases[2] if expectJob.ID != bJobs[0].ID { - expectJob = bjTestCases[5] + expectJob = bjTestCases[3] } require.Equal(t, expectJob, bJobs[0]) - previousTime, err := ddl.GetOracleTime(se) + previousTime, err := ddl.GetOracleTimeWithStartTS(se) require.EqualError(t, err, "[kv:8024]invalid transaction") readInTxn(se, func(sessionctx.Context) { - previousTime, err = ddl.GetOracleTime(se) + previousTime, err = ddl.GetOracleTimeWithStartTS(se) require.NoError(t, err) }) bJobs, err = ddl.GetAndMarkBackfillJobsForOneEle(se, 1, jobID2, uuid, instanceLease) require.NoError(t, err) require.Len(t, bJobs, 1) - expectJob = bjTestCases[4] + expectJob = bjTestCases[2] if expectJob.ID != bJobs[0].ID { - expectJob = bjTestCases[5] + expectJob = bjTestCases[3] } expectJob.InstanceID = uuid equalBackfillJob(t, expectJob, bJobs[0], ddl.GetLeaseGoTime(previousTime, instanceLease)) var currTime time.Time readInTxn(se, func(sessionctx.Context) { - currTime, err = ddl.GetOracleTime(se) + currTime, err = ddl.GetOracleTimeWithStartTS(se) require.NoError(t, err) }) currGoTime := ddl.GetLeaseGoTime(currTime, instanceLease) @@ -323,8 +325,8 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // 1 jobID1 eleID1 // 0 jobID2 eleID2 // 1 jobID2 eleID2 - // 0 jobID2 eleID1 - // 1 jobID2 eleID1 + // 0 jobID2 eleID3 + // 1 jobID2 eleID3 require.NoError(t, err) allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") require.NoError(t, err) @@ -337,8 +339,8 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // ID jobID eleID // ------------------------ // 1 jobID1 eleID1 - // 0 jobID2 eleID1 - // 1 jobID2 eleID1 + // 0 jobID2 eleID3 + // 1 jobID2 eleID3 require.NoError(t, err) allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") require.NoError(t, err) @@ -361,28 +363,36 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // ------------------------ // 0 jobID2 eleID2 readInTxn(se, func(sessionctx.Context) { - currTime, err = ddl.GetOracleTime(se) + currTime, err = ddl.GetOracleTimeWithStartTS(se) require.NoError(t, err) }) - condition := fmt.Sprintf("exec_ID = '' or exec_lease < '%v' and ddl_job_id = %d order by ddl_job_id", currTime.Add(-instanceLease), jobID1) + condition := fmt.Sprintf("exec_ID = '' or exec_lease < '%v' and ddl_job_id = %d order by ddl_job_id", currTime.Add(-instanceLease), jobID2) bJobs, err = ddl.GetBackfillJobs(se, ddl.BackfillHistoryTable, condition, "test_get_bj") require.NoError(t, err) require.Len(t, bJobs, 1) - require.Greater(t, bJobs[0].FinishTS, uint64(0)) + require.Equal(t, bJobs[0].FinishTS, uint64(0)) - // test GetInterruptedBackfillJobsForOneEle - bJobs, err = ddl.GetInterruptedBackfillJobsForOneEle(se, jobID1, eleID1, meta.IndexElementKey) + // test GetMaxBackfillJob and GetInterruptedBackfillJobsForOneEle + bjob, err := ddl.GetMaxBackfillJob(se, bJobs3[0].JobID, bJobs3[0].EleID, eleKey) + require.NoError(t, err) + require.Nil(t, bjob) + bJobs, err = ddl.GetInterruptedBackfillJobsForOneEle(se, jobID1, eleID1, eleKey) require.NoError(t, err) require.Nil(t, bJobs) + err = ddl.AddBackfillJobs(se, bjTestCases) + require.NoError(t, err) // ID jobID eleID // ------------------------ // 0 jobID1 eleID1 // 1 jobID1 eleID1 // 0 jobID2 eleID2 // 1 jobID2 eleID2 - err = ddl.AddBackfillJobs(se, bjTestCases) + // 0 jobID2 eleID3 + // 1 jobID2 eleID3 + bjob, err = ddl.GetMaxBackfillJob(se, jobID2, eleID2, eleKey) require.NoError(t, err) - bJobs, err = ddl.GetInterruptedBackfillJobsForOneEle(se, jobID1, eleID1, meta.IndexElementKey) + require.Equal(t, bJobs2[1], bjob) + bJobs, err = ddl.GetInterruptedBackfillJobsForOneEle(se, jobID1, eleID1, eleKey) require.NoError(t, err) require.Nil(t, bJobs) bJobs1[0].State = model.JobStateRollingback @@ -390,6 +400,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) { bJobs1[0].InstanceID = uuid bJobs1[1].State = model.JobStateCancelling bJobs1[1].ID = 3 + bJobs1[1].Meta.ErrMsg = "errMsg" err = ddl.AddBackfillJobs(se, bJobs1) require.NoError(t, err) // ID jobID eleID state @@ -398,19 +409,134 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // 1 jobID1 eleID1 JobStateNone // 0 jobID2 eleID2 JobStateNone // 1 jobID2 eleID2 JobStateNone - // 0 jobID2 eleID1 JobStateNone - // 1 jobID2 eleID1 JobStateNone + // 0 jobID2 eleID3 JobStateNone + // 1 jobID2 eleID3 JobStateNone // 2 jobID1 eleID1 JobStateRollingback // 3 jobID1 eleID1 JobStateCancelling - bJobs, err = ddl.GetInterruptedBackfillJobsForOneEle(se, jobID1, eleID1, meta.IndexElementKey) + bjob, err = ddl.GetMaxBackfillJob(se, jobID1, eleID1, eleKey) + require.NoError(t, err) + require.Equal(t, bJobs1[1], bjob) + bJobs, err = ddl.GetInterruptedBackfillJobsForOneEle(se, jobID1, eleID1, eleKey) require.NoError(t, err) require.Len(t, bJobs, 2) equalBackfillJob(t, bJobs1[0], bJobs[0], types.ZeroTime) equalBackfillJob(t, bJobs1[1], bJobs[1], types.ZeroTime) - // test the BackfillJob's AbbrStr - require.Equal(t, fmt.Sprintf("ID:2, JobID:2, EleID:4, Type:add index, State:rollingback, InstanceID:%s, InstanceLease:0000-00-00 00:00:00", uuid), bJobs1[0].AbbrStr()) - require.Equal(t, "ID:3, JobID:2, EleID:4, Type:add index, State:cancelling, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs1[1].AbbrStr()) - require.Equal(t, "ID:0, JobID:3, EleID:5, Type:add index, State:none, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs2[0].AbbrStr()) - require.Equal(t, "ID:1, JobID:3, EleID:5, Type:add index, State:none, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs2[1].AbbrStr()) + require.Equal(t, fmt.Sprintf("ID:2, JobID:1, EleID:11, Type:add index, State:rollingback, InstanceID:%s, InstanceLease:0000-00-00 00:00:00", uuid), bJobs1[0].AbbrStr()) + require.Equal(t, "ID:3, JobID:1, EleID:11, Type:add index, State:cancelling, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs1[1].AbbrStr()) + require.Equal(t, "ID:0, JobID:2, EleID:33, Type:add index, State:none, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs3[0].AbbrStr()) + require.Equal(t, "ID:1, JobID:2, EleID:33, Type:add index, State:none, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs3[1].AbbrStr()) + + bJobs1[0].State = model.JobStateNone + bJobs1[0].ID = 5 + bJobs1[1].State = model.JobStateNone + bJobs1[1].ID = 4 + err = ddl.AddBackfillHistoryJob(se, bJobs1) + // BackfillTable + // ID jobID eleID state + // -------------------------------- + // 0 jobID1 eleID1 JobStateNone + // 1 jobID1 eleID1 JobStateNone + // 0 jobID2 eleID2 JobStateNone + // 1 jobID2 eleID2 JobStateNone + // 0 jobID2 eleID3 JobStateNone + // 1 jobID2 eleID3 JobStateNone + // 2 jobID1 eleID1 JobStateRollingback + // 3 jobID1 eleID1 JobStateCancelling + // + // BackfillHistoryTable + // ID jobID eleID state + // -------------------------------- + // 5 jobID1 eleID1 JobStateNone + // 4 jobID1 eleID1 JobStateNone + bjob, err = ddl.GetMaxBackfillJob(se, jobID1, eleID1, eleKey) + require.NoError(t, err) + require.Equal(t, bJobs1[0], bjob) + bJobs1[0].ID = 6 + bJobs1[1].ID = 7 + err = ddl.AddBackfillJobs(se, bJobs1) + // BackfillTable + // ID jobID eleID state + // -------------------------------- + // 0 jobID1 eleID1 JobStateNone + // 1 jobID1 eleID1 JobStateNone + // 0 jobID2 eleID2 JobStateNone + // 1 jobID2 eleID2 JobStateNone + // 0 jobID2 eleID3 JobStateNone + // 1 jobID2 eleID3 JobStateNone + // 2 jobID1 eleID1 JobStateRollingback + // 3 jobID1 eleID1 JobStateCancelling + // 6 jobID1 eleID1 JobStateNone + // 7 jobID1 eleID1 JobStateNone + // + // BackfillHistoryTable + // ID jobID eleID state + // -------------------------------- + // 5 jobID1 eleID1 JobStateNone + // 4 jobID1 eleID1 JobStateNone + bjob, err = ddl.GetMaxBackfillJob(se, jobID1, eleID1, eleKey) + require.NoError(t, err) + require.Equal(t, bJobs1[1], bjob) + + // test MoveBackfillJobsToHistoryTable + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID2, eleID3), "test_get_bj") + require.NoError(t, err) + require.Equal(t, allCnt, 2) + err = ddl.MoveBackfillJobsToHistoryTable(se, bJobs3[0]) + require.NoError(t, err) + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID2, eleID3), "test_get_bj") + require.NoError(t, err) + require.Equal(t, allCnt, 0) + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillHistoryTable, getIdxConditionStr(jobID2, eleID3), "test_get_bj") + require.NoError(t, err) + require.Equal(t, allCnt, 2) + // BackfillTable + // ID jobID eleID state + // -------------------------------- + // 0 jobID1 eleID1 JobStateNone + // 1 jobID1 eleID1 JobStateNone + // 0 jobID2 eleID2 JobStateNone + // 1 jobID2 eleID2 JobStateNone + // 2 jobID1 eleID1 JobStateRollingback + // 3 jobID1 eleID1 JobStateCancelling + // 6 jobID1 eleID1 JobStateNone + // 7 jobID1 eleID1 JobStateNone + // + // BackfillHistoryTable + // ID jobID eleID state + // -------------------------------- + // 5 jobID1 eleID1 JobStateNone + // 4 jobID1 eleID1 JobStateNone + // 0 jobID2 eleID3 JobStateNone + // 1 jobID2 eleID3 JobStateNone + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") + require.NoError(t, err) + require.Equal(t, allCnt, 6) + err = ddl.MoveBackfillJobsToHistoryTable(se, bJobs1[0]) + require.NoError(t, err) + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") + require.NoError(t, err) + require.Equal(t, allCnt, 0) + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillHistoryTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") + require.NoError(t, err) + require.Equal(t, allCnt, 8) + // BackfillTable + // ID jobID eleID state + // -------------------------------- + // 0 jobID2 eleID2 JobStateNone + // 1 jobID2 eleID2 JobStateNone + // + // BackfillHistoryTable + // ID jobID eleID state + // -------------------------------- + // 5 jobID1 eleID1 JobStateNone + // 4 jobID1 eleID1 JobStateNone + // 0 jobID2 eleID3 JobStateNone + // 1 jobID2 eleID3 JobStateNone + // 0 jobID1 eleID1 JobStateNone + // 1 jobID1 eleID1 JobStateNone + // 2 jobID1 eleID1 JobStateRollingback + // 3 jobID1 eleID1 JobStateCancelling + // 6 jobID1 eleID1 JobStateNone + // 7 jobID1 eleID1 JobStateNone } diff --git a/ddl/partition.go b/ddl/partition.go index 37b9f8bb2f5c7..5b67c82c5bf8b 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1757,7 +1757,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) ( } } rh := newReorgHandler(t, w.sess) - reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job), d, rh, job, dbInfo, tbl, physicalTableIDs, elements) + reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job.ID), d, rh, job, dbInfo, tbl, physicalTableIDs, elements) if err != nil || reorgInfo.first { // If we run reorg firstly, we should update the job snapshot version diff --git a/ddl/reorg.go b/ddl/reorg.go index a394d1682db82..7912560499344 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -196,7 +196,7 @@ func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *mo } } - rc := w.getReorgCtx(job) + rc := w.getReorgCtx(job.ID) if rc == nil { // This job is cancelling, we should return ErrCancelledDDLJob directly. // Q: Is there any possibility that the job is cancelling and has no reorgCtx? @@ -289,7 +289,7 @@ func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *mo } func (w *worker) mergeWarningsIntoJob(job *model.Job) { - rc := w.getReorgCtx(job) + rc := w.getReorgCtx(job.ID) rc.mu.Lock() defer rc.mu.Unlock() partWarnings := rc.mu.warnings @@ -352,13 +352,13 @@ func getTableTotalCount(w *worker, tblInfo *model.TableInfo) int64 { return rows[0].GetInt64(0) } -func (dc *ddlCtx) isReorgRunnable(job *model.Job) error { +func (dc *ddlCtx) isReorgRunnable(jobID int64) error { if isChanClosed(dc.ctx.Done()) { // Worker is closed. So it can't do the reorganization. return dbterror.ErrInvalidWorker.GenWithStack("worker is closed") } - if dc.getReorgCtx(job).isReorgCanceled() { + if dc.getReorgCtx(jobID).isReorgCanceled() { // Job is cancelled. So it can't be done. return dbterror.ErrCancelledDDLJob } diff --git a/executor/BUILD.bazel b/executor/BUILD.bazel index e1ecaca57456f..a3fa2dfc8b7a9 100644 --- a/executor/BUILD.bazel +++ b/executor/BUILD.bazel @@ -372,6 +372,7 @@ go_test( "//sessionctx/binloginfo", "//sessionctx/stmtctx", "//sessionctx/variable", + "//sessionctx/variable/featuretag/distributereorg", "//sessiontxn", "//sessiontxn/staleread", "//statistics", diff --git a/executor/ddl_test.go b/executor/ddl_test.go index 6f4badaa475ed..bb8775a013a30 100644 --- a/executor/ddl_test.go +++ b/executor/ddl_test.go @@ -39,6 +39,7 @@ import ( "github.com/pingcap/tidb/parser/terror" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/sessionctx/variable/featuretag/distributereorg" "github.com/pingcap/tidb/sessiontxn" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/table" @@ -1306,6 +1307,20 @@ func TestSetDDLErrorCountLimit(t *testing.T) { res.Check(testkit.Rows("100")) } +func TestLoadDDLDistributeVars(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + require.Equal(t, variable.DefTiDBDDLEnableDistributeReorg, distributereorg.TiDBEnableDistributeReorg) + + tk.MustGetDBError("set @@global.tidb_ddl_distribute_reorg = invalid_val", variable.ErrWrongValueForVar) + require.Equal(t, distributereorg.TiDBEnableDistributeReorg, variable.DDLEnableDistributeReorg.Load()) + tk.MustExec("set @@global.tidb_ddl_distribute_reorg = 'on'") + require.Equal(t, true, variable.DDLEnableDistributeReorg.Load()) + tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_distribute_reorg = %v", distributereorg.TiDBEnableDistributeReorg)) + require.Equal(t, distributereorg.TiDBEnableDistributeReorg, variable.DDLEnableDistributeReorg.Load()) +} + // Test issue #9205, fix the precision problem for time type default values // See https://github.com/pingcap/tidb/issues/9205 for details func TestIssue9205(t *testing.T) { diff --git a/parser/model/ddl.go b/parser/model/ddl.go index 4c09f06c29152..46c1e65477d99 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -426,14 +426,18 @@ type JobMeta struct { // BackfillMeta is meta info of the backfill job. type BackfillMeta struct { - EndInclude bool `json:"end_include"` - ErrMsg string `json:"err_msg"` + PhysicalTableID int64 `json:"physical_table_id"` + IsUnique bool `json:"is_unique"` + EndInclude bool `json:"end_include"` + ErrMsg string `json:"err_msg"` SQLMode mysql.SQLMode `json:"sql_mode"` Warnings map[errors.ErrorID]*terror.Error `json:"warnings"` WarningsCount map[errors.ErrorID]int64 `json:"warnings_count"` Location *TimeZoneLocation `json:"location"` - *JobMeta `json:"job_meta"` + ReorgTp ReorgType `json:"reorg_tp"` + + *JobMeta `json:"job_meta"` } // Encode encodes BackfillMeta with json format.