Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: use the correct schema version when waitSchemaSynced #37210

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1714,3 +1714,17 @@ func TestBuildMaxLengthIndexWithNonRestrictedSqlMode(t *testing.T) {
}
}
}

func TestTiDBDownBeforeUpdateGlobalVersion(t *testing.T) {
store := testkit.CreateMockStore(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(a int)")

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDownBeforeUpdateGlobalVersion", `return(true)`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/checkDownBeforeUpdateGlobalVersion", `return(true)`))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do Disable after using.

tk.MustExec("alter table t add column b int")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDownBeforeUpdateGlobalVersion"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/checkDownBeforeUpdateGlobalVersion"))
}
42 changes: 35 additions & 7 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ var (
ddlWorkerID = atomicutil.NewInt32(0)
// WaitTimeWhenErrorOccurred is waiting interval when processing DDL jobs encounter errors.
WaitTimeWhenErrorOccurred = int64(1 * time.Second)

mockDDLErrOnce = int64(0)
)

// GetWaitTimeWhenErrorOccurred return waiting interval when processing DDL jobs encounter errors.
Expand Down Expand Up @@ -782,6 +784,15 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) error {
time.Sleep(GetWaitTimeWhenErrorOccurred())
}

failpoint.Inject("mockDownBeforeUpdateGlobalVersion", func(val failpoint.Value) {
if val.(bool) {
if mockDDLErrOnce == 0 {
mockDDLErrOnce = schemaVer
failpoint.Return(errors.New("mock for ddl down"))
}
}
})

// Here means the job enters another state (delete only, write only, public, etc...) or is cancelled.
// If the job is done or still running or rolling back, we will wait 2 * lease time to guarantee other servers to update
// the newest schema.
Expand Down Expand Up @@ -875,9 +886,11 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error {
}

if once {
w.waitSchemaSynced(d, job, waitTime)
once = false
return nil
err = w.waitSchemaSynced(d, job, waitTime)
if err == nil {
once = false
}
return err
}

if job.IsDone() || job.IsRollbackDone() {
Expand Down Expand Up @@ -1299,19 +1312,34 @@ func (w *worker) waitSchemaChanged(ctx context.Context, d *ddlCtx, waitTime time
// but in this case we don't wait enough 2 * lease time to let other servers update the schema.
// So here we get the latest schema version to make sure all servers' schema version update to the latest schema version
// in a cluster, or to wait for 2 * lease time.
func (w *worker) waitSchemaSynced(d *ddlCtx, job *model.Job, waitTime time.Duration) {
func (w *worker) waitSchemaSynced(d *ddlCtx, job *model.Job, waitTime time.Duration) error {
if !job.IsRunning() && !job.IsRollingback() && !job.IsDone() && !job.IsRollbackDone() {
return
return nil
}
ctx, cancelFunc := context.WithTimeout(w.ctx, waitTime)
defer cancelFunc()

latestSchemaVersion, err := d.schemaSyncer.MustGetGlobalVersion(ctx)
ver, _ := w.store.CurrentVersion(kv.GlobalTxnScope)
snapshot := w.store.GetSnapshot(ver)
m := meta.NewSnapshotMeta(snapshot)
latestSchemaVersion, err := m.GetSchemaVersionWithNonEmptyDiff()
if err != nil {
logutil.Logger(w.logCtx).Warn("[ddl] get global version failed", zap.Error(err))
return
return err
}

failpoint.Inject("checkDownBeforeUpdateGlobalVersion", func(val failpoint.Value) {
if val.(bool) {
if mockDDLErrOnce > 0 && mockDDLErrOnce != latestSchemaVersion {
panic("check down before update global version failed")
} else {
mockDDLErrOnce = -1
}
}
})

w.waitSchemaChanged(ctx, d, waitTime, latestSchemaVersion, job)
return nil
}

func buildPlacementAffects(oldIDs []int64, newIDs []int64) []*model.AffectedOption {
Expand Down
10 changes: 8 additions & 2 deletions ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,14 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
// we should wait 2 * d.lease time to guarantee all TiDB server have finished the schema change.
// see waitSchemaSynced for more details.
if !d.isSynced(job) || d.once.Load() {
wk.waitSchemaSynced(d.ddlCtx, job, 2*d.lease)
d.once.Store(false)
err := wk.waitSchemaSynced(d.ddlCtx, job, 2*d.lease)
if err == nil {
d.once.Store(false)
} else {
logutil.BgLogger().Warn("[ddl] wait ddl job sync failed", zap.Error(err), zap.String("job", job.String()))
time.Sleep(time.Second)
return
}
}
if err := wk.HandleDDLJobTable(d.ddlCtx, job); err != nil {
logutil.BgLogger().Info("[ddl] handle ddl job failed", zap.Error(err), zap.String("job", job.String()))
Expand Down
5 changes: 0 additions & 5 deletions ddl/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,6 @@ func (s *MockSchemaSyncer) OwnerUpdateGlobalVersion(ctx context.Context, version
return nil
}

// MustGetGlobalVersion implements SchemaSyncer.MustGetGlobalVersion interface.
func (s *MockSchemaSyncer) MustGetGlobalVersion(ctx context.Context) (int64, error) {
return 0, nil
}

// OwnerCheckAllVersions implements SchemaSyncer.OwnerCheckAllVersions interface.
func (s *MockSchemaSyncer) OwnerCheckAllVersions(ctx context.Context, latestVer int64) error {
ticker := time.NewTicker(mockCheckVersInterval)
Expand Down
43 changes: 0 additions & 43 deletions ddl/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ type SchemaSyncer interface {
GlobalVersionCh() clientv3.WatchChan
// WatchGlobalSchemaVer watches the global schema version.
WatchGlobalSchemaVer(ctx context.Context)
// MustGetGlobalVersion gets the global version. The only reason it fails is that ctx is done.
MustGetGlobalVersion(ctx context.Context) (int64, error)
// Done returns a channel that closes when the syncer is no longer being refreshed.
Done() <-chan struct{}
// Restart restarts the syncer when it's on longer being refreshed.
Expand Down Expand Up @@ -236,47 +234,6 @@ func (s *schemaVersionSyncer) removeSelfVersionPath() error {
return errors.Trace(err)
}

// MustGetGlobalVersion implements SchemaSyncer.MustGetGlobalVersion interface.
func (s *schemaVersionSyncer) MustGetGlobalVersion(ctx context.Context) (int64, error) {
startTime := time.Now()
var (
err error
ver int
resp *clientv3.GetResponse
)
failedCnt := 0
intervalCnt := int(time.Second / util.KeyOpRetryInterval)

defer func() {
metrics.OwnerHandleSyncerHistogram.WithLabelValues(metrics.OwnerGetGlobalVersion, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
}()
for {
if err != nil {
if failedCnt%intervalCnt == 0 {
logutil.BgLogger().Info("[ddl] syncer get global version failed", zap.Error(err))
}
time.Sleep(util.KeyOpRetryInterval)
failedCnt++
}

if util.IsContextDone(ctx) {
err = errors.Trace(ctx.Err())
return 0, err
}

resp, err = s.etcdCli.Get(ctx, util.DDLGlobalSchemaVersion)
if err != nil {
continue
}
if len(resp.Kvs) > 0 {
ver, err = strconv.Atoi(string(resp.Kvs[0].Value))
if err == nil {
return int64(ver), nil
}
}
}
}

// OwnerCheckAllVersions implements SchemaSyncer.OwnerCheckAllVersions interface.
func (s *schemaVersionSyncer) OwnerCheckAllVersions(ctx context.Context, latestVer int64) error {
startTime := time.Now()
Expand Down
11 changes: 1 addition & 10 deletions ddl/syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,6 @@ func TestSyncerSimple(t *testing.T) {

key := util2.DDLAllSchemaVersions + "/" + d.OwnerManager().ID()
checkRespKV(t, 1, key, InitialVersion, resp.Kvs...)
// for MustGetGlobalVersion function
globalVer, err := d.SchemaSyncer().MustGetGlobalVersion(ctx)
require.NoError(t, err)
require.Equal(t, InitialVersion, fmt.Sprintf("%d", globalVer))

childCtx, cancel := context.WithTimeout(ctx, minInterval)
defer cancel()
_, err = d.SchemaSyncer().MustGetGlobalVersion(childCtx)
require.True(t, isTimeoutError(err))

ic2 := infoschema.NewCache(2)
ic2.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0), 0)
Expand Down Expand Up @@ -138,7 +129,7 @@ func TestSyncerSimple(t *testing.T) {
require.Equal(t, "", checkErr)

// for CheckAllVersions
childCtx, cancel = context.WithTimeout(ctx, 200*time.Millisecond)
childCtx, cancel := context.WithTimeout(ctx, 200*time.Millisecond)
require.Error(t, d.SchemaSyncer().OwnerCheckAllVersions(childCtx, currentVer))
cancel()

Expand Down
1 change: 0 additions & 1 deletion metrics/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ var (
}, []string{LblResult})

OwnerUpdateGlobalVersion = "update_global_version"
OwnerGetGlobalVersion = "get_global_version"
OwnerCheckAllVersions = "check_all_versions"
OwnerHandleSyncerHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand Down