Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: check ddl history, timestamp and privilege for flashback #37265

Merged
merged 34 commits into from
Aug 23, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
ad0f3dd
ddl: forbid exchange partition with temporary table (#37204)
lcwangchao Aug 18, 2022
a1d1356
ddl: Fix for unsigned partitioning expressions (#36830)
mjonss Aug 18, 2022
44f684e
*: only create `TemporaryTableAttachedInfoSchema` if needed (#37196)
lcwangchao Aug 18, 2022
8511b9b
lightning: Fix panic when downstream table schema has changed (#36805)
ForwardStar Aug 19, 2022
7d401c5
expression: fix incorrect unit test from #37036 (#37115)
SeaRise Aug 19, 2022
d6ebc60
expression: make collation work with json type (#37211)
xiongjiwei Aug 19, 2022
8c79898
txn: seperate the prewrite and commit details information to make it …
cfzjywxk Aug 19, 2022
d1f75f0
ddl: set tiflash placement group index to 120 (#37179)
lcwangchao Aug 19, 2022
b690f1c
dumpling: fix wrong behaviour of StrictCollationCompatible (#37243)
lance6716 Aug 19, 2022
41b9e26
ddl: use the correct schema version when waitSchemaSynced (#37210)
wjhuang2016 Aug 19, 2022
c61199a
*: more concise bazel log (#37247)
xhebox Aug 19, 2022
8b5b724
bazel: enable announcerc (#37217)
hawkingrei Aug 19, 2022
9af0f03
*: only create snapshot interceptor for temporary table when needed (…
lcwangchao Aug 22, 2022
aaf0613
doc: add design doc for stats lru cache (#36804)
Yisaer Aug 22, 2022
1b44540
executor: check ddl history, timestamp and privilege for flashback
Defined2014 Aug 17, 2022
4cf7eee
planner: add warn log for sync stats (#36956)
Yisaer Aug 22, 2022
76485da
ddl: fix compile
Defined2014 Aug 22, 2022
e0da196
planner: support HashJoin cost detail (#37012)
Yisaer Aug 22, 2022
34bc21a
follow comments
Defined2014 Aug 22, 2022
3977a0a
parser: fix test_driver with RestoreStringWithoutCharset and RestoreS…
Defined2014 Aug 22, 2022
2745f1a
table partition: add test for exchange partition (#37083)
ymkzpx Aug 22, 2022
ffc5e17
ddl/lightning: add a memory tracker for DDL lightning (#37271)
tangenta Aug 22, 2022
21847fe
planner: set EnableOuterJoinReorder to false by default (#37264)
winoros Aug 22, 2022
cf69205
expression: refactor json path syntax and make it compatible with mys…
xiongjiwei Aug 23, 2022
a68be97
use schemaVersion to check ddl history
Defined2014 Aug 23, 2022
9d1cfb2
improve log
Defined2014 Aug 23, 2022
3cd1760
improve comments
Defined2014 Aug 23, 2022
75e0885
*: allow setting placement policy and tiflash replica at the same tim…
lcwangchao Aug 23, 2022
b01e4c4
rollback some function
Defined2014 Aug 23, 2022
fe40d3c
expression: add json opaque value (#37200)
YangKeao Aug 23, 2022
6206b95
planner: remove useless part for the preparedStmtExec (#37293)
Reminiscent Aug 23, 2022
0e4af6c
planner: fix outer join reorder will push down its outer join conditi…
AilinKid Aug 23, 2022
cfd4ddd
planner: reuse DetachCondsAndBuildRanges logic for both index and clu…
xuyifangreeneyes Aug 23, 2022
a65ad21
Merge branch 'master' into pre-check
Defined2014 Aug 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
srcs = [
"backfilling.go",
"callback.go",
"cluster.go",
"column.go",
"constant.go",
"ddl.go",
Expand Down
77 changes: 77 additions & 0 deletions ddl/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl

import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/model"
)

func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
var flashbackTS uint64
if err := job.DecodeArgs(&flashbackTS); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

nowSchemaVersion, err := t.GetSchemaVersion()
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

flashbackSchemaVersion, err := meta.NewSnapshotMeta(d.store.GetSnapshot(kv.NewVersion(flashbackTS))).GetSchemaVersion()
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

// If flashbackSchemaVersion not same as nowSchemaVersion, we've done ddl during [flashbackTs, now).
if flashbackSchemaVersion != nowSchemaVersion {
job.State = model.JobStateCancelled
return ver, errors.Errorf("schema version not same, have done ddl during [flashbackTS, now)")
}

sess, err := w.sessPool.get()
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
defer w.sessPool.put(sess)

jobs, err := GetAllDDLJobs(sess, t)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

// Other non-flashback ddl jobs in queue, return error.
if len(jobs) != 1 {
job.State = model.JobStateCancelled
var otherJob *model.Job
for _, j := range jobs {
if j.ID != job.ID {
otherJob = j
break
}
}
return ver, errors.Errorf("have other ddl jobs(jobID: %d) in queue, can't do flashback", otherJob.ID)
}

job.State = model.JobStateDone
return ver, errors.Trace(err)
}
1 change: 1 addition & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ type DDL interface {
CreatePlacementPolicy(ctx sessionctx.Context, stmt *ast.CreatePlacementPolicyStmt) error
DropPlacementPolicy(ctx sessionctx.Context, stmt *ast.DropPlacementPolicyStmt) error
AlterPlacementPolicy(ctx sessionctx.Context, stmt *ast.AlterPlacementPolicyStmt) error
FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error

// CreateSchemaWithInfo creates a database (schema) given its database info.
//
Expand Down
11 changes: 11 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2593,6 +2593,17 @@ func (d *ddl) preSplitAndScatter(ctx sessionctx.Context, tbInfo *model.TableInfo
}
}

func (d *ddl) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error {
job := &model.Job{
Type: model.ActionFlashbackCluster,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{flashbackTS},
}
err := d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
return errors.Trace(err)
}

func (d *ddl) RecoverTable(ctx sessionctx.Context, recoverInfo *RecoverInfo) (err error) {
is := d.GetInfoSchemaWithInterceptor(ctx)
schemaID, tbInfo := recoverInfo.SchemaID, recoverInfo.TableInfo
Expand Down
2 changes: 2 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1208,6 +1208,8 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
ver, err = onAlterCacheTable(d, t, job)
case model.ActionAlterNoCacheTable:
ver, err = onAlterNoCacheTable(d, t, job)
case model.ActionFlashbackCluster:
ver, err = w.onFlashbackCluster(d, t, job)
case model.ActionMultiSchemaChange:
ver, err = onMultiSchemaChange(w, d, t, job)
default:
Expand Down
6 changes: 6 additions & 0 deletions ddl/schematracker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,12 @@ func (d Checker) RecoverTable(ctx sessionctx.Context, recoverInfo *ddl.RecoverIn
panic("implement me")
}

// FlashbackCluster implements the DDL interface.
func (d Checker) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) (err error) {
//TODO implement me
panic("implement me")
}

// DropView implements the DDL interface.
func (d Checker) DropView(ctx sessionctx.Context, stmt *ast.DropTableStmt) (err error) {
err = d.realDDL.DropView(ctx, stmt)
Expand Down
5 changes: 5 additions & 0 deletions ddl/schematracker/dm_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,11 @@ func (d SchemaTracker) RecoverTable(ctx sessionctx.Context, recoverInfo *ddl.Rec
return nil
}

// FlashbackCluster implements the DDL interface, which is no-op in DM's case.
func (d SchemaTracker) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) (err error) {
return nil
}

// DropView implements the DDL interface.
func (d SchemaTracker) DropView(ctx sessionctx.Context, stmt *ast.DropTableStmt) (err error) {
notExistTables := make([]string, 0, len(stmt.Tables))
Expand Down
2 changes: 1 addition & 1 deletion executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ go_test(
"pkg_test.go",
"point_get_test.go",
"prepared_test.go",
"recover_table_test.go",
"recover_test.go",
"resource_tag_test.go",
"revoke_test.go",
"rowid_test.go",
Expand Down
59 changes: 54 additions & 5 deletions executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,24 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/temptable"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/gcutil"
"github.com/pingcap/tidb/util/logutil"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
)

Expand All @@ -59,8 +64,8 @@ func (e *DDLExec) toErr(err error) error {
checker := domain.NewSchemaChecker(dom, e.is.SchemaMetaVersion(), nil)
txn, err1 := e.ctx.Txn(true)
if err1 != nil {
logutil.BgLogger().Error("active txn failed", zap.Error(err))
return err1
logutil.BgLogger().Error("active txn failed", zap.Error(err1))
return err
}
_, schemaInfoErr := checker.Check(txn.StartTS())
if schemaInfoErr != nil {
Expand Down Expand Up @@ -170,7 +175,7 @@ func (e *DDLExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
case *ast.FlashBackTableStmt:
err = e.executeFlashbackTable(x)
case *ast.FlashBackClusterStmt:
err = e.executeFlashBackCluster(x)
err = e.executeFlashBackCluster(ctx, x)
case *ast.RenameTableStmt:
err = e.executeRenameTable(x)
case *ast.TruncateTableStmt:
Expand Down Expand Up @@ -516,8 +521,52 @@ func (e *DDLExec) getRecoverTableByTableName(tableName *ast.TableName) (*model.J
return jobInfo, tableInfo, nil
}

func (e *DDLExec) executeFlashBackCluster(s *ast.FlashBackClusterStmt) error {
return dbterror.ErrGeneralUnsupportedDDL.GenWithStackByArgs("FLASHBACK CLUSTER")
// ValidateFlashBackTS validates that flashBackTS in range [gcSafePoint, currentTS)
func ValidateFlashBackTS(ctx context.Context, sctx sessionctx.Context, flashBackTS uint64) error {
currentTS, err := sctx.GetStore().GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0)
// If we fail to calculate currentTS from local time, fallback to get a timestamp from PD
if err != nil {
metrics.ValidateReadTSFromPDCount.Inc()
currentVer, err := sctx.GetStore().CurrentVersion(oracle.GlobalTxnScope)
if err != nil {
return errors.Errorf("fail to validate flashback timestamp: %v", err)
}
currentTS = currentVer.Ver
}
if oracle.GetTimeFromTS(flashBackTS).After(oracle.GetTimeFromTS(currentTS)) {
return errors.Errorf("cannot set flashback timestamp to future time")
}
gcSafePoint, err := gcutil.GetGCSafePoint(sctx)
if err != nil {
return err
}

return gcutil.ValidateSnapshotWithGCSafePoint(flashBackTS, gcSafePoint)
}

func (e *DDLExec) executeFlashBackCluster(ctx context.Context, s *ast.FlashBackClusterStmt) error {
checker := privilege.GetPrivilegeManager(e.ctx)
if !checker.RequestVerification(e.ctx.GetSessionVars().ActiveRoles, "", "", "", mysql.SuperPriv) {
return core.ErrSpecificAccessDenied.GenWithStackByArgs("SUPER")
}

tiFlashInfo, err := getTiFlashStores(e.ctx)
if err != nil {
return err
}
if len(tiFlashInfo) != 0 {
return errors.Errorf("not support flash back cluster with TiFlash stores")
}

flashbackTS, err := staleread.CalculateAsOfTsExpr(e.ctx, &s.AsOf)
if err != nil {
return err
}
if err = ValidateFlashBackTS(ctx, e.ctx, flashbackTS); err != nil {
return err
}

return domain.GetDomain(e.ctx).DDL().FlashbackCluster(e.ctx, flashbackTS)
}

func (e *DDLExec) executeFlashbackTable(s *ast.FlashBackTableStmt) error {
Expand Down
47 changes: 47 additions & 0 deletions executor/recover_table_test.go → executor/recover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
ddlutil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/auth"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util/gcutil"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -289,6 +292,50 @@ func TestRecoverTableMeetError(t *testing.T) {
tk.MustContainErrMsg("select * from t_recover", "Table 'test_recover.t_recover' doesn't exist")
}

func TestRecoverClusterMeetError(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

// Get GC safe point error.
tk.MustContainErrMsg(fmt.Sprintf("flashback cluster as of timestamp '%s'", time.Now().Add(30*time.Second)), "cannot set flashback timestamp to future time")
tk.MustContainErrMsg(fmt.Sprintf("flashback cluster as of timestamp '%s'", time.Now().Add(0-30*time.Second)), "can not get 'tikv_gc_safe_point'")

timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk)
defer resetGC()

//set GC safe point.
tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop))

// out of GC safe point range.
tk.MustGetErrCode(fmt.Sprintf("flashback cluster as of timestamp '%s'", time.Now().Add(0-60*60*60*time.Second)), int(variable.ErrSnapshotTooOld.Code()))

// Flashback without super privilege.
tk.MustExec("CREATE USER 'testflashback'@'localhost';")
newTk := testkit.NewTestKit(t, store)
require.True(t, newTk.Session().Auth(&auth.UserIdentity{Username: "testflashback", Hostname: "localhost"}, nil, nil))
newTk.MustGetErrCode(fmt.Sprintf("flashback cluster as of timestamp '%s'", time.Now().Add(0-30*time.Second)), int(core.ErrSpecificAccessDenied.Code()))
tk.MustExec("drop user 'testflashback'@'localhost';")

// Flashback failed because of ddl history.
tk.MustExec("use test;")
tk.MustExec("create table t(a int);")
tk.MustContainErrMsg(fmt.Sprintf("flashback cluster as of timestamp '%s'", time.Now().Add(0-30*time.Second)), "schema version not same, have done ddl during [flashbackTS, now)")
}

func TestRecoverClusterWithTiFlash(t *testing.T) {
store := testkit.CreateMockStore(t, withMockTiFlash(1))
tk := testkit.NewTestKit(t, store)

timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk)
defer resetGC()

//set GC safe point
tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop))

tk.MustContainErrMsg(fmt.Sprintf("flashback cluster as of timestamp '%s'", time.Now().Add(0-30*time.Second)),
"not support flash back cluster with TiFlash stores")
}

// MockGC is used to make GC work in the test environment.
func MockGC(tk *testkit.TestKit) (string, string, string, func()) {
originGC := ddlutil.IsEmulatorGCEnable()
Expand Down
6 changes: 4 additions & 2 deletions parser/model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ const (
ActionCreateTables ActionType = 60
ActionMultiSchemaChange ActionType = 61
ActionSetTiFlashMode ActionType = 62
ActionFlashbackCluster ActionType = 63
)

var actionMap = map[ActionType]string{
Expand Down Expand Up @@ -158,6 +159,7 @@ var actionMap = map[ActionType]string{
ActionAlterTableStatsOptions: "alter table statistics options",
ActionMultiSchemaChange: "alter table multi-schema change",
ActionSetTiFlashMode: "set tiflash mode",
ActionFlashbackCluster: "flashback cluster",

// `ActionAlterTableAlterPartition` is removed and will never be used.
// Just left a tombstone here for compatibility.
Expand Down Expand Up @@ -676,9 +678,9 @@ func (job *Job) IsDependentOn(other *Job) (bool, error) {
}

// IsFinished returns whether job is finished or not.
// If the job state is Done or Cancelled, it is finished.
// If the job state is Done, Cancelled, RollbackDone or Synced, it is finished.
func (job *Job) IsFinished() bool {
return job.State == JobStateDone || job.State == JobStateRollbackDone || job.State == JobStateCancelled
return job.State == JobStateDone || job.State == JobStateRollbackDone || job.State == JobStateCancelled || job.State == JobStateSynced
}

// IsCancelled returns whether the job is cancelled or not.
Expand Down