Skip to content

Commit

Permalink
*: merge feature/flashback-cluster to master (#37529)
Browse files Browse the repository at this point in the history
ref #37197
  • Loading branch information
Defined2014 authored Sep 1, 2022
1 parent aa5b7ab commit a04000c
Show file tree
Hide file tree
Showing 20 changed files with 10,480 additions and 9,900 deletions.
1 change: 1 addition & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
srcs = [
"backfilling.go",
"callback.go",
"cluster.go",
"column.go",
"constant.go",
"ddl.go",
Expand Down
227 changes: 227 additions & 0 deletions ddl/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl

import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/gcutil"
"github.com/tikv/client-go/v2/oracle"
)

var pdScheduleKey = []string{
"hot-region-schedule-limit",
"leader-schedule-limit",
"merge-schedule-limit",
"region-schedule-limit",
"replica-schedule-limit",
}

func closePDSchedule(job *model.Job) error {
if err := savePDSchedule(job); err != nil {
return err
}
saveValue := make(map[string]interface{})
for _, key := range pdScheduleKey {
saveValue[key] = 0
}
return infosync.SetPDScheduleConfig(context.Background(), saveValue)
}

func savePDSchedule(job *model.Job) error {
retValue, err := infosync.GetPDScheduleConfig(context.Background())
if err != nil {
return err
}
saveValue := make(map[string]interface{})
for _, key := range pdScheduleKey {
saveValue[key] = retValue[key]
}
job.Args = append(job.Args, saveValue)
return nil
}

func recoverPDSchedule(pdScheduleParam map[string]interface{}) error {
if pdScheduleParam == nil {
return nil
}
return infosync.SetPDScheduleConfig(context.Background(), pdScheduleParam)
}

// ValidateFlashbackTS validates that flashBackTS in range [gcSafePoint, currentTS).
func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBackTS uint64) error {
currentTS, err := sctx.GetStore().GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0)
// If we fail to calculate currentTS from local time, fallback to get a timestamp from PD.
if err != nil {
metrics.ValidateReadTSFromPDCount.Inc()
currentVer, err := sctx.GetStore().CurrentVersion(oracle.GlobalTxnScope)
if err != nil {
return errors.Errorf("fail to validate flashback timestamp: %v", err)
}
currentTS = currentVer.Ver
}
if oracle.GetTimeFromTS(flashBackTS).After(oracle.GetTimeFromTS(currentTS)) {
return errors.Errorf("cannot set flashback timestamp to future time")
}
gcSafePoint, err := gcutil.GetGCSafePoint(sctx)
if err != nil {
return err
}

return gcutil.ValidateSnapshotWithGCSafePoint(flashBackTS, gcSafePoint)
}

func checkFlashbackCluster(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, flashbackTS uint64) (err error) {
sess, err := w.sessPool.get()
if err != nil {
return errors.Trace(err)
}
defer w.sessPool.put(sess)

if err = gcutil.DisableGC(sess); err != nil {
return err
}
if err = closePDSchedule(job); err != nil {
return err
}
if err = ValidateFlashbackTS(d.ctx, sess, flashbackTS); err != nil {
return err
}

nowSchemaVersion, err := t.GetSchemaVersion()
if err != nil {
return errors.Trace(err)
}

flashbackSchemaVersion, err := meta.NewSnapshotMeta(d.store.GetSnapshot(kv.NewVersion(flashbackTS))).GetSchemaVersion()
if err != nil {
return errors.Trace(err)
}

// If flashbackSchemaVersion not same as nowSchemaVersion, we've done ddl during [flashbackTs, now).
if flashbackSchemaVersion != nowSchemaVersion {
return errors.Errorf("schema version not same, have done ddl during [flashbackTS, now)")
}

jobs, err := GetAllDDLJobs(sess, t)
if err != nil {
return errors.Trace(err)
}
// Other ddl jobs in queue, return error.
if len(jobs) != 1 {
var otherJob *model.Job
for _, j := range jobs {
if j.ID != job.ID {
otherJob = j
break
}
}
return errors.Errorf("have other ddl jobs(jobID: %d) in queue, can't do flashback", otherJob.ID)
}
return nil
}

// A Flashback has 3 different stages.
// 1. before lock flashbackClusterJobID, check clusterJobID and lock it.
// 2. before reorg start, check timestamp, disable GC and close PD schedule.
// 3. before reorg done.
func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
var flashbackTS uint64
if err := job.DecodeArgs(&flashbackTS); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

flashbackJobID, err := t.GetFlashbackClusterJobID()
if err != nil {
return ver, err
}

// stage 1, check and set FlashbackClusterJobID.
if flashbackJobID == 0 {
err = kv.RunInNewTxn(w.ctx, w.store, true, func(ctx context.Context, txn kv.Transaction) error {
return meta.NewMeta(txn).SetFlashbackClusterJobID(job.ID)
})
if err != nil {
job.State = model.JobStateCancelled
}
return ver, errors.Trace(err)
} else if flashbackJobID != job.ID {
job.State = model.JobStateCancelled
return ver, errors.Errorf("Other flashback job(ID: %d) is running", job.ID)
}

// stage 2, before reorg start, SnapshotVer == 0 means, job has not started reorg
if job.SnapshotVer == 0 {
if err = checkFlashbackCluster(w, d, t, job, flashbackTS); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
// get the current version for reorganization.
snapVer, err := getValidCurrentVersion(d.store)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
job.SnapshotVer = snapVer.Ver
return ver, nil
}

job.State = model.JobStateDone
return ver, errors.Trace(err)
}

func finishFlashbackCluster(w *worker, job *model.Job) error {
var flashbackTS uint64
var pdScheduleValue map[string]interface{}
if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue); err != nil {
return errors.Trace(err)
}

err := kv.RunInNewTxn(w.ctx, w.store, true, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
jobID, err := t.GetFlashbackClusterJobID()
if err != nil {
return err
}
if jobID == job.ID {
if pdScheduleValue != nil {
if err = recoverPDSchedule(pdScheduleValue); err != nil {
return err
}
}
if err = enableGC(w); err != nil {
return err
}
err = t.SetFlashbackClusterJobID(0)
if err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}

return nil
}
1 change: 1 addition & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ type DDL interface {
CreatePlacementPolicy(ctx sessionctx.Context, stmt *ast.CreatePlacementPolicyStmt) error
DropPlacementPolicy(ctx sessionctx.Context, stmt *ast.DropPlacementPolicyStmt) error
AlterPlacementPolicy(ctx sessionctx.Context, stmt *ast.AlterPlacementPolicyStmt) error
FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error

// CreateSchemaWithInfo creates a database (schema) given its database info.
//
Expand Down
11 changes: 11 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2593,6 +2593,17 @@ func (d *ddl) preSplitAndScatter(ctx sessionctx.Context, tbInfo *model.TableInfo
}
}

func (d *ddl) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error {
job := &model.Job{
Type: model.ActionFlashbackCluster,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{flashbackTS, map[string]interface{}{}},
}
err := d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
return errors.Trace(err)
}

func (d *ddl) RecoverTable(ctx sessionctx.Context, recoverInfo *RecoverInfo) (err error) {
is := d.GetInfoSchemaWithInterceptor(ctx)
schemaID, tbInfo := recoverInfo.SchemaID, recoverInfo.TableInfo
Expand Down
31 changes: 31 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,13 @@ func (d *ddl) addBatchDDLJobs2Queue(tasks []*limitJobTask) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
return kv.RunInNewTxn(ctx, d.store, true, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
jobID, err := t.GetFlashbackClusterJobID()
if err != nil {
return errors.Trace(err)
}
if jobID != 0 {
return errors.Errorf("Can't add to ddl table, cluster is flashing back now")
}
ids, err := t.GenGlobalIDs(len(tasks))
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -383,6 +390,13 @@ func (d *ddl) addBatchDDLJobs2Table(tasks []*limitJobTask) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
err = kv.RunInNewTxn(ctx, d.store, true, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
jobID, err := t.GetFlashbackClusterJobID()
if err != nil {
return errors.Trace(err)
}
if jobID != 0 {
return errors.Errorf("Can't add to ddl table, cluster is flashing back now")
}
ids, err = t.GenGlobalIDs(len(tasks))
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -541,6 +555,8 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
switch job.Type {
case model.ActionRecoverTable:
err = finishRecoverTable(w, job)
case model.ActionFlashbackCluster:
err = finishFlashbackCluster(w, job)
case model.ActionCreateTables:
if job.IsCancelled() {
// it may be too large that it can not be added to the history queue, too
Expand Down Expand Up @@ -1094,6 +1110,19 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
if job.Type != model.ActionMultiSchemaChange {
logutil.Logger(w.logCtx).Info("[ddl] run DDL job", zap.String("job", job.String()))
}

// Should check flashbackClusterJobID.
// Some ddl jobs maybe added between check and insert into ddl job table.
flashbackJobID, err := t.GetFlashbackClusterJobID()
if err != nil {
job.State = model.JobStateCancelled
return ver, err
}
if flashbackJobID != 0 && flashbackJobID != job.ID {
job.State = model.JobStateCancelled
return ver, errors.Errorf("Can't do ddl job, cluster is flashing back now")
}

timeStart := time.Now()
if job.RealStartTS == 0 {
job.RealStartTS = t.StartTS
Expand Down Expand Up @@ -1219,6 +1248,8 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
ver, err = onAlterCacheTable(d, t, job)
case model.ActionAlterNoCacheTable:
ver, err = onAlterNoCacheTable(d, t, job)
case model.ActionFlashbackCluster:
ver, err = w.onFlashbackCluster(d, t, job)
case model.ActionMultiSchemaChange:
ver, err = onMultiSchemaChange(w, d, t, job)
default:
Expand Down
6 changes: 6 additions & 0 deletions ddl/schematracker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,12 @@ func (d Checker) RecoverTable(ctx sessionctx.Context, recoverInfo *ddl.RecoverIn
panic("implement me")
}

// FlashbackCluster implements the DDL interface.
func (d Checker) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) (err error) {
//TODO implement me
panic("implement me")
}

// DropView implements the DDL interface.
func (d Checker) DropView(ctx sessionctx.Context, stmt *ast.DropTableStmt) (err error) {
err = d.realDDL.DropView(ctx, stmt)
Expand Down
5 changes: 5 additions & 0 deletions ddl/schematracker/dm_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,11 @@ func (d SchemaTracker) RecoverTable(ctx sessionctx.Context, recoverInfo *ddl.Rec
return nil
}

// FlashbackCluster implements the DDL interface, which is no-op in DM's case.
func (d SchemaTracker) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) (err error) {
return nil
}

// DropView implements the DDL interface.
func (d SchemaTracker) DropView(ctx sessionctx.Context, stmt *ast.DropTableStmt) (err error) {
notExistTables := make([]string, 0, len(stmt.Tables))
Expand Down
1 change: 1 addition & 0 deletions domain/infosync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
"info.go",
"label_manager.go",
"placement_manager.go",
"schedule_manager.go",
"region.go",
"tiflash_manager.go",
],
Expand Down
Loading

0 comments on commit a04000c

Please sign in to comment.