diff --git a/DEPS.bzl b/DEPS.bzl index 43f76556ecdd9..351129c23b51f 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -2915,8 +2915,8 @@ def go_deps(): name = "com_github_pingcap_kvproto", build_file_proto_mode = "disable_global", importpath = "github.com/pingcap/kvproto", - sum = "h1:hnUlIU5nCH6PAO9DC5DhODX1cwqoTcXTNIODyvNI9q4=", - version = "v0.0.0-20221123043343-cdc67325f05f", + sum = "h1:LzIZsQpXQlj8yF7+yvyOg680OaPq7bmPuDuszgXfHsw=", + version = "v0.0.0-20221129023506-621ec37aac7a", ) go_repository( name = "com_github_pingcap_log", @@ -3519,8 +3519,8 @@ def go_deps(): name = "com_github_tikv_client_go_v2", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", - sum = "h1:nfdLYzkmYR2b0XurXSXcVFk06eO//P4VtkqoADX2tR4=", - version = "v2.0.3-0.20221128025602-81939ec8b2bb", + sum = "h1:vlgZedcfExiTzB3BB4nt5CpaghDfm9La/0Ofn7weIUA=", + version = "v2.0.3-0.20221129032117-857772dd0907", ) go_repository( name = "com_github_tikv_pd_client", diff --git a/br/pkg/restore/data.go b/br/pkg/restore/data.go index 7432c3d9af0ce..b4ed1c1144dd8 100644 --- a/br/pkg/restore/data.go +++ b/br/pkg/restore/data.go @@ -57,7 +57,7 @@ func RecoverData(ctx context.Context, resolveTS uint64, allStores []*metapb.Stor return totalRegions, errors.Trace(err) } - if err := recovery.PrepareFlashbackToVersion(ctx); err != nil { + if err := recovery.PrepareFlashbackToVersion(ctx, resolveTS, restoreTS-1); err != nil { return totalRegions, errors.Trace(err) } @@ -304,12 +304,12 @@ func (recovery *Recovery) WaitApply(ctx context.Context) (err error) { } // prepare the region for flashback the data, the purpose is to stop region service, put region in flashback state -func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context) (err error) { +func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context, resolveTS uint64, startTS uint64) (err error) { var totalRegions atomic.Uint64 totalRegions.Store(0) handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { - stats, err := ddl.SendPrepareFlashbackToVersionRPC(ctx, recovery.mgr.GetStorage().(tikv.Storage), r) + stats, err := ddl.SendPrepareFlashbackToVersionRPC(ctx, recovery.mgr.GetStorage().(tikv.Storage), resolveTS, startTS, r) totalRegions.Add(uint64(stats.CompletedRegions)) return stats, err } diff --git a/ddl/cluster.go b/ddl/cluster.go index ebb833156cec2..96a7cd8544abb 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -50,11 +50,7 @@ import ( ) var pdScheduleKey = []string{ - "hot-region-schedule-limit", - "leader-schedule-limit", "merge-schedule-limit", - "region-schedule-limit", - "replica-schedule-limit", } const ( @@ -68,6 +64,7 @@ const ( autoAnalyzeOffset readOnlyOffset totalLockedRegionsOffset + startTSOffset commitTSOffset ) @@ -280,6 +277,7 @@ func GetFlashbackKeyRanges(sess sessionctx.Context) ([]kv.KeyRange, error) { func SendPrepareFlashbackToVersionRPC( ctx context.Context, s tikv.Storage, + flashbackTS, startTS uint64, r tikvstore.KeyRange, ) (rangetask.TaskStat, error) { startKey, rangeEndKey := r.StartKey, r.EndKey @@ -314,6 +312,8 @@ func SendPrepareFlashbackToVersionRPC( req := tikvrpc.NewRequest(tikvrpc.CmdPrepareFlashbackToVersion, &kvrpcpb.PrepareFlashbackToVersionRequest{ StartKey: startKey, EndKey: endKey, + StartTs: startTS, + Version: flashbackTS, }) resp, err := s.SendReq(bo, req, loc.Region, flashbackTimeout) @@ -481,11 +481,11 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve return ver, errors.Errorf("Not support flashback cluster in non-TiKV env") } - var flashbackTS, lockedRegions, commitTS uint64 + var flashbackTS, lockedRegions, startTS, commitTS uint64 var pdScheduleValue map[string]interface{} var autoAnalyzeValue, readOnlyValue string var gcEnabledValue bool - if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabledValue, &autoAnalyzeValue, &readOnlyValue, &lockedRegions, &commitTS); err != nil { + if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabledValue, &autoAnalyzeValue, &readOnlyValue, &lockedRegions, &startTS, &commitTS); err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } @@ -533,6 +533,13 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve job.State = model.JobStateCancelled return ver, errors.Trace(err) } + // We should get startTS here to avoid lost startTS when TiDB crashed during send prepare flashback RPC. + startTS, err = d.store.GetOracle().GetTimestamp(d.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + job.Args[startTSOffset] = startTS job.SchemaState = model.StateWriteOnly return ver, nil // Stage 3, get key ranges and get locks. @@ -552,7 +559,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve for _, r := range keyRanges { if err = flashbackToVersion(d.ctx, d, func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { - stats, err := SendPrepareFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), r) + stats, err := SendPrepareFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), flashbackTS, startTS, r) totalRegions.Add(uint64(stats.CompletedRegions)) return stats, err }, r.StartKey, r.EndKey); err != nil { @@ -587,8 +594,8 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve for _, r := range keyRanges { if err = flashbackToVersion(d.ctx, d, func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { - // Use commitTS - 1 as startTS, make sure it less than commitTS. - stats, err := SendFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), flashbackTS, commitTS-1, commitTS, r) + // Use same startTS as prepare phase to simulate 1PC txn. + stats, err := SendFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), flashbackTS, startTS, commitTS, r) completedRegions.Add(uint64(stats.CompletedRegions)) logutil.BgLogger().Info("[ddl] flashback cluster stats", zap.Uint64("complete regions", completedRegions.Load()), @@ -615,12 +622,12 @@ func finishFlashbackCluster(w *worker, job *model.Job) error { return nil } - var flashbackTS, lockedRegions, commitTS uint64 + var flashbackTS, lockedRegions, startTS, commitTS uint64 var pdScheduleValue map[string]interface{} var autoAnalyzeValue, readOnlyValue string var gcEnabled bool - if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabled, &autoAnalyzeValue, &readOnlyValue, &lockedRegions, &commitTS); err != nil { + if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabled, &autoAnalyzeValue, &readOnlyValue, &lockedRegions, &startTS, &commitTS); err != nil { return errors.Trace(err) } sess, err := w.sessPool.get() diff --git a/ddl/cluster_test.go b/ddl/cluster_test.go index 2cbf5ee45336a..4c1ec291f87f2 100644 --- a/ddl/cluster_test.go +++ b/ddl/cluster_test.go @@ -98,7 +98,7 @@ func TestFlashbackCloseAndResetPDSchedule(t *testing.T) { fmt.Sprintf("return(%v)", injectSafeTS))) oldValue := map[string]interface{}{ - "hot-region-schedule-limit": 1, + "merge-schedule-limit": 1, } require.NoError(t, infosync.SetPDScheduleConfig(context.Background(), oldValue)) @@ -112,7 +112,7 @@ func TestFlashbackCloseAndResetPDSchedule(t *testing.T) { if job.SchemaState == model.StateWriteReorganization { closeValue, err := infosync.GetPDScheduleConfig(context.Background()) assert.NoError(t, err) - assert.Equal(t, closeValue["hot-region-schedule-limit"], 0) + assert.Equal(t, closeValue["merge-schedule-limit"], 0) // cancel flashback job job.State = model.JobStateCancelled job.Error = dbterror.ErrCancelledDDLJob @@ -128,7 +128,7 @@ func TestFlashbackCloseAndResetPDSchedule(t *testing.T) { finishValue, err := infosync.GetPDScheduleConfig(context.Background()) require.NoError(t, err) - require.EqualValues(t, finishValue["hot-region-schedule-limit"], 1) + require.EqualValues(t, finishValue["merge-schedule-limit"], 1) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 1816e0d65891d..6e2373d12f52f 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -2724,6 +2724,14 @@ func (d *ddl) preSplitAndScatter(ctx sessionctx.Context, tbInfo *model.TableInfo func (d *ddl) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error { logutil.BgLogger().Info("[ddl] get flashback cluster job", zap.String("flashbackTS", oracle.GetTimeFromTS(flashbackTS).String())) + nowTS, err := ctx.GetStore().GetOracle().GetTimestamp(d.ctx, &oracle.Option{}) + if err != nil { + return errors.Trace(err) + } + gap := time.Until(oracle.GetTimeFromTS(nowTS)).Abs() + if gap > 1*time.Second { + ctx.GetSessionVars().StmtCtx.AppendWarning(errors.Errorf("Gap between local time and PD TSO is %s, please check PD/system time", gap)) + } job := &model.Job{ Type: model.ActionFlashbackCluster, BinlogInfo: &model.HistoryInfo{}, @@ -2735,9 +2743,10 @@ func (d *ddl) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error variable.On, /* tidb_enable_auto_analyze */ variable.Off, /* tidb_super_read_only */ 0, /* totalRegions */ - 0 /* newCommitTS */}, + 0, /* startTS */ + 0 /* commitTS */}, } - err := d.DoDDLJob(ctx, job) + err = d.DoDDLJob(ctx, job) err = d.callHookOnChanged(job, err) return errors.Trace(err) } diff --git a/go.mod b/go.mod index f225545bfafd1..502ca90e47ea9 100644 --- a/go.mod +++ b/go.mod @@ -68,7 +68,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20220729040631-518f63d66278 github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3 github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 - github.com/pingcap/kvproto v0.0.0-20221123043343-cdc67325f05f + github.com/pingcap/kvproto v0.0.0-20221129023506-621ec37aac7a github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e @@ -86,7 +86,7 @@ require ( github.com/stretchr/testify v1.8.0 github.com/tdakkota/asciicheck v0.1.1 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.3-0.20221128025602-81939ec8b2bb + github.com/tikv/client-go/v2 v2.0.3-0.20221129032117-857772dd0907 github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 github.com/twmb/murmur3 v1.1.3 diff --git a/go.sum b/go.sum index 70c92d02d2881..dc6888f53d3b2 100644 --- a/go.sum +++ b/go.sum @@ -779,8 +779,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= -github.com/pingcap/kvproto v0.0.0-20221123043343-cdc67325f05f h1:hnUlIU5nCH6PAO9DC5DhODX1cwqoTcXTNIODyvNI9q4= -github.com/pingcap/kvproto v0.0.0-20221123043343-cdc67325f05f/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= +github.com/pingcap/kvproto v0.0.0-20221129023506-621ec37aac7a h1:LzIZsQpXQlj8yF7+yvyOg680OaPq7bmPuDuszgXfHsw= +github.com/pingcap/kvproto v0.0.0-20221129023506-621ec37aac7a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= @@ -930,8 +930,8 @@ github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3 h1:f+jULpR github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3/go.mod h1:ON8b8w4BN/kE1EOhwT0o+d62W65a6aPw1nouo9LMgyY= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= -github.com/tikv/client-go/v2 v2.0.3-0.20221128025602-81939ec8b2bb h1:nfdLYzkmYR2b0XurXSXcVFk06eO//P4VtkqoADX2tR4= -github.com/tikv/client-go/v2 v2.0.3-0.20221128025602-81939ec8b2bb/go.mod h1:kqFVxpx40hAgqqLHXLEPJDM/j6ZVfH5CNdJEtkJvO58= +github.com/tikv/client-go/v2 v2.0.3-0.20221129032117-857772dd0907 h1:vlgZedcfExiTzB3BB4nt5CpaghDfm9La/0Ofn7weIUA= +github.com/tikv/client-go/v2 v2.0.3-0.20221129032117-857772dd0907/go.mod h1:MDT4J9LzgS7Bj1DnEq6Gk/puy6mp8TgUC92zGEVVLLg= github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 h1:ckPpxKcl75mO2N6a4cJXiZH43hvcHPpqc9dh1TmH1nc= github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07/go.mod h1:CipBxPfxPUME+BImx9MUYXCnAVLS3VJUr3mnSJwh40A= github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro=