Skip to content

Commit

Permalink
ddl: Support flashback cluster with ddl history (#40209)
Browse files Browse the repository at this point in the history
ref #40026
  • Loading branch information
Defined2014 authored Dec 29, 2022
1 parent 1a7b395 commit 6dff69f
Show file tree
Hide file tree
Showing 14 changed files with 469 additions and 161 deletions.
60 changes: 54 additions & 6 deletions ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"encoding/hex"
"fmt"
"math"
"strings"
"time"

Expand Down Expand Up @@ -147,6 +148,17 @@ func getTiDBSuperReadOnly(sess sessionctx.Context) (string, error) {
return val, nil
}

func isFlashbackSupportedDDLAction(action model.ActionType) bool {
switch action {
case model.ActionSetTiFlashReplica, model.ActionUpdateTiFlashReplicaStatus, model.ActionAlterPlacementPolicy,
model.ActionAlterTablePlacement, model.ActionAlterTablePartitionPlacement, model.ActionCreatePlacementPolicy,
model.ActionDropPlacementPolicy, model.ActionModifySchemaDefaultPlacement:
return false
default:
return true
}
}

func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta.Meta, job *model.Job, flashbackTS uint64) (err error) {
if err = ValidateFlashbackTS(d.ctx, sess, flashbackTS); err != nil {
return err
Expand All @@ -170,19 +182,47 @@ func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta
return errors.Trace(err)
}

flashbackSchemaVersion, err := meta.NewSnapshotMeta(d.store.GetSnapshot(kv.NewVersion(flashbackTS))).GetSchemaVersion()
flashbackSnapshotMeta := meta.NewSnapshotMeta(d.store.GetSnapshot(kv.NewVersion(flashbackTS)))
flashbackSchemaVersion, err := flashbackSnapshotMeta.GetSchemaVersion()
if err != nil {
return errors.Trace(err)
}

// If flashbackSchemaVersion not same as nowSchemaVersion, we've done ddl during [flashbackTs, now).
flashbackTSString := oracle.GetTimeFromTS(flashbackTS).String()

// Check if there is an upgrade during [flashbackTS, now)
sql := fmt.Sprintf("select VARIABLE_VALUE from mysql.tidb as of timestamp '%s' where VARIABLE_NAME='tidb_server_version'", flashbackTSString)
rows, err := newSession(sess).execute(d.ctx, sql, "check_tidb_server_version")
if err != nil || len(rows) == 0 {
return errors.Errorf("Get history `tidb_server_version` failed, can't do flashback")
}
sql = fmt.Sprintf("select 1 from mysql.tidb where VARIABLE_NAME='tidb_server_version' and VARIABLE_VALUE=%s", rows[0].GetString(0))
rows, err = newSession(sess).execute(d.ctx, sql, "check_tidb_server_version")
if err != nil {
return errors.Trace(err)
}
if len(rows) == 0 {
return errors.Errorf("Detected TiDB upgrade during [%s, now), can't do flashback", flashbackTSString)
}

// Check is there a DDL task at flashbackTS.
sql = fmt.Sprintf("select count(*) from mysql.%s as of timestamp '%s'", JobTable, flashbackTSString)
rows, err = newSession(sess).execute(d.ctx, sql, "check_history_job")
if err != nil || len(rows) == 0 {
return errors.Errorf("Get history ddl jobs failed, can't do flashback")
}
if rows[0].GetInt64(0) != 0 {
return errors.Errorf("Detected another DDL job at %s, can't do flashback", flashbackTSString)
}

// If flashbackSchemaVersion not same as nowSchemaVersion, we should check all schema diffs during [flashbackTs, now).
for i := flashbackSchemaVersion + 1; i <= nowSchemaVersion; i++ {
diff, err := t.GetSchemaDiff(i)
if err != nil {
return errors.Trace(err)
}
if diff != nil && diff.Type != model.ActionFlashbackCluster {
return errors.Errorf("Detected schema change due to another DDL job during [%s, now), can't do flashback", oracle.GetTimeFromTS(flashbackTS))
if diff != nil && !isFlashbackSupportedDDLAction(diff.Type) {
return errors.Errorf("Detected unsupported DDL job type(%s) during [%s, now), can't do flashback", diff.Type.String(), flashbackTSString)
}
}

Expand Down Expand Up @@ -211,7 +251,7 @@ type flashbackID struct {

func addToSlice(schema string, tableName string, tableID int64, flashbackIDs []flashbackID) []flashbackID {
var excluded bool
if filter.IsSystemSchema(schema) && !strings.HasPrefix(tableName, "stats_") {
if filter.IsSystemSchema(schema) && !strings.HasPrefix(tableName, "stats_") && tableName != "gc_delete_range" {
excluded = true
}
flashbackIDs = append(flashbackIDs, flashbackID{
Expand Down Expand Up @@ -270,6 +310,14 @@ func GetFlashbackKeyRanges(sess sessionctx.Context) ([]kv.KeyRange, error) {
})
}

// The meta data key ranges.
metaStartKey := tablecodec.EncodeMetaKey(meta.DBkey(0), meta.TableKey(0))
metaEndKey := tablecodec.EncodeMetaKey(meta.DBkey(math.MaxInt64), meta.TableKey(math.MaxInt64))
keyRanges = append(keyRanges, kv.KeyRange{
StartKey: metaStartKey,
EndKey: metaEndKey,
})

return keyRanges, nil
}

Expand Down Expand Up @@ -633,7 +681,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
asyncNotifyEvent(d, &util.Event{Tp: model.ActionFlashbackCluster})
job.State = model.JobStateDone
job.SchemaState = model.StatePublic
return ver, nil
return updateSchemaVersion(d, t, job)
}
return ver, nil
}
Expand Down
14 changes: 8 additions & 6 deletions ddl/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,16 @@ func TestGetFlashbackKeyRanges(t *testing.T) {

kvRanges, err := ddl.GetFlashbackKeyRanges(se)
require.NoError(t, err)
// The results are 6 key ranges
// 0: (stats_meta,stats_histograms,stats_buckets)
// The results are 8 key ranges
// 0: (stats_meta,stats_histograms,stats_buckets, gc_delete_range)
// 1: (stats_feedback)
// 2: (stats_top_n)
// 3: (stats_extended)
// 4: (stats_fm_sketch)
// 5: (stats_history, stats_meta_history)
// 6: (stats_table_locked)
require.Len(t, kvRanges, 7)
// 7: meta Ranges
require.Len(t, kvRanges, 8)

tk.MustExec("use test")
tk.MustExec("CREATE TABLE employees (" +
Expand All @@ -64,7 +65,7 @@ func TestGetFlashbackKeyRanges(t *testing.T) {
");")
tk.MustExec("truncate table mysql.analyze_jobs")

// truncate all `stats_` tables, make table ID consecutive.
// truncate all `stats_` and `gc_delete_range` tables, make table ID consecutive.
tk.MustExec("truncate table mysql.stats_meta")
tk.MustExec("truncate table mysql.stats_histograms")
tk.MustExec("truncate table mysql.stats_buckets")
Expand All @@ -75,14 +76,15 @@ func TestGetFlashbackKeyRanges(t *testing.T) {
tk.MustExec("truncate table mysql.stats_history")
tk.MustExec("truncate table mysql.stats_meta_history")
tk.MustExec("truncate table mysql.stats_table_locked")
tk.MustExec("truncate table mysql.gc_delete_range")
kvRanges, err = ddl.GetFlashbackKeyRanges(se)
require.NoError(t, err)
require.Len(t, kvRanges, 2)
require.Len(t, kvRanges, 3)

tk.MustExec("truncate table test.employees")
kvRanges, err = ddl.GetFlashbackKeyRanges(se)
require.NoError(t, err)
require.Len(t, kvRanges, 1)
require.Len(t, kvRanges, 2)
}

func TestFlashbackCloseAndResetPDSchedule(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1609,6 +1609,11 @@ func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job, multiInfos ...
OldTableID: recoverTabsInfo[i].TableInfo.ID,
}
}
case model.ActionFlashbackCluster:
diff.TableID = -1
if job.SchemaState == model.StatePublic {
diff.RegenerateSchemaMap = true
}
default:
diff.TableID = job.TableID
}
Expand Down
1 change: 1 addition & 0 deletions ddl/tiflashtest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_test(
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//testutils",
"@org_uber_go_goleak//:goleak",
"@org_uber_go_zap//:zap",
Expand Down
39 changes: 39 additions & 0 deletions ddl/tiflashtest/ddl_tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/testutils"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -438,6 +439,44 @@ func TestTiFlashDropPartition(t *testing.T) {
CheckTableAvailableWithTableName(s.dom, t, 1, []string{}, "test", "ddltiflash")
}

func TestTiFlashFlashbackCluster(t *testing.T) {
s, teardown := createTiFlashContext(t)
defer teardown()
tk := testkit.NewTestKit(t, s.store)

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
tk.MustExec("insert into t values (1), (2), (3)")

ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
require.NoError(t, err)

tk.MustExec("alter table t set tiflash replica 1")
time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable)
CheckTableAvailableWithTableName(s.dom, t, 1, []string{}, "test", "t")

injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(10 * time.Second))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`))
require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))

ChangeGCSafePoint(tk, time.Now().Add(-10*time.Second), "true", "10m0s")
defer func() {
ChangeGCSafePoint(tk, time.Now(), "true", "10m0s")
}()

errorMsg := fmt.Sprintf("[ddl:-1]Detected unsupported DDL job type(%s) during [%s, now), can't do flashback",
model.ActionSetTiFlashReplica.String(), oracle.GetTimeFromTS(ts).String())
tk.MustGetErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)), errorMsg)

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
}

func CheckTableAvailableWithTableName(dom *domain.Domain, t *testing.T, count uint64, labels []string, db string, table string) {
tb, err := dom.InfoSchema().TableByName(model.NewCIStr(db), model.NewCIStr(table))
require.NoError(t, err)
Expand Down
4 changes: 4 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
// 1. Not first time bootstrap loading, which needs a full load.
// 2. It is newer than the current one, so it will be "the current one" after this function call.
// 3. There are less 100 diffs.
// 4. No regenrated schema diff.
startTime := time.Now()
if currentSchemaVersion != 0 && neededSchemaVersion > currentSchemaVersion && neededSchemaVersion-currentSchemaVersion < 100 {
is, relatedChanges, err := do.tryLoadSchemaDiffs(m, currentSchemaVersion, neededSchemaVersion)
Expand Down Expand Up @@ -347,6 +348,9 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64
if err != nil {
return nil, nil, err
}
if diff.RegenerateSchemaMap {
return nil, nil, errors.Errorf("Meets a schema diff with RegenerateSchemaMap flag")
}
if canSkipSchemaCheckerDDL(diff.Type) {
continue
}
Expand Down
2 changes: 1 addition & 1 deletion domain/schema_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (s *schemaValidator) isRelatedTablesChanged(currVer int64, tableIDs []int64
affected := false
for i, tblID := range item.relatedIDs {
for _, relatedTblID := range tableIDs {
if tblID == relatedTblID {
if tblID == relatedTblID || relatedTblID == -1 {
// if actionType >= 64, the value of left shift equals 0, and it will not impact amend txn
changedTblMap[tblID] |= 1 << item.relatedActions[i]
affected = true
Expand Down
10 changes: 6 additions & 4 deletions executor/recover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,12 @@ func TestRecoverClusterMeetError(t *testing.T) {
newTk.MustGetErrCode(fmt.Sprintf("flashback cluster to timestamp '%s'", time.Now().Add(0-30*time.Second)), errno.ErrPrivilegeCheckFail)
tk.MustExec("drop user 'testflashback'@'localhost';")

// Flashback failed because of ddl history.
tk.MustExec("use test;")
tk.MustExec("create table t(a int);")
tk.MustMatchErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", flashbackTs), "Detected schema change due to another DDL job during \\[.*, now\\), can't do flashback")
// update tidb_server_version
nowTS, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
require.NoError(t, err)
tk.MustExec("update mysql.tidb set VARIABLE_VALUE=VARIABLE_VALUE+1 where VARIABLE_NAME='tidb_server_version'")
errorMsg := fmt.Sprintf("[ddl:-1]Detected TiDB upgrade during [%s, now), can't do flashback", oracle.GetTimeFromTS(nowTS).String())
tk.MustGetErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(nowTS)), errorMsg)

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
Expand Down
2 changes: 2 additions & 0 deletions parser/model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,8 @@ type SchemaDiff struct {
OldTableID int64 `json:"old_table_id"`
// OldSchemaID is the schema ID before rename table, only used by rename table DDL.
OldSchemaID int64 `json:"old_schema_id"`
// RegenerateSchemaMap means whether to rebuild the schema map when applying to the schema diff.
RegenerateSchemaMap bool `json:"regenerate_schema_map"`

AffectedOpts []*AffectedOption `json:"affected_options"`
}
Expand Down
6 changes: 0 additions & 6 deletions tests/realtikvtest/brietest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,20 @@ go_test(
srcs = [
"backup_restore_test.go",
"binlog_test.go",
"flashback_test.go",
"main_test.go",
],
flaky = True,
race = "on",
deps = [
"//config",
"//ddl/util",
"//parser/model",
"//parser/mysql",
"//sessionctx/binloginfo",
"//store/mockstore/mockcopr",
"//testkit",
"//testkit/testsetup",
"//tests/realtikvtest",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_tipb//go-binlog",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//util",
"@org_golang_google_grpc//:grpc",
"@org_uber_go_goleak//:goleak",
],
Expand Down
Loading

0 comments on commit 6dff69f

Please sign in to comment.