Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: flashback/recover a table will clear its placement settings #31670

Merged
merged 2 commits into from
Jan 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions ddl/placement_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2181,11 +2181,11 @@ func (s *testDBSuite6) TestRecoverTableWithPlacementPolicy(c *C) {
tk.MustExec("recover table tp1")
tk.MustQuery("show create table tp1").Check(testkit.Rows("tp1 CREATE TABLE `tp1` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PRIMARY_REGION=\"r1\" REGIONS=\"r1,r2\" */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" +
"PARTITION BY RANGE (`id`)\n" +
"(PARTITION `p0` VALUES LESS THAN (100) /*T![placement] PRIMARY_REGION=\"r2\" REGIONS=\"r2,r3\" */,\n" +
"(PARTITION `p0` VALUES LESS THAN (100),\n" +
" PARTITION `p1` VALUES LESS THAN (1000),\n" +
" PARTITION `p2` VALUES LESS THAN (10000) /*T![placement] PRIMARY_REGION=\"r3\" REGIONS=\"r3,r4\" */)"))
" PARTITION `p2` VALUES LESS THAN (10000))"))
checkExistTableBundlesInPD(c, s.dom, "test", "tp1")

// test flashback
Expand All @@ -2201,11 +2201,11 @@ func (s *testDBSuite6) TestRecoverTableWithPlacementPolicy(c *C) {
tk.MustExec("flashback table tp2")
tk.MustQuery("show create table tp2").Check(testkit.Rows("tp2 CREATE TABLE `tp2` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PRIMARY_REGION=\"r1\" REGIONS=\"r1,r2\" */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" +
"PARTITION BY RANGE (`id`)\n" +
"(PARTITION `p0` VALUES LESS THAN (100) /*T![placement] PRIMARY_REGION=\"r2\" REGIONS=\"r2,r3\" */,\n" +
"(PARTITION `p0` VALUES LESS THAN (100),\n" +
" PARTITION `p1` VALUES LESS THAN (1000),\n" +
" PARTITION `p2` VALUES LESS THAN (10000) /*T![placement] PRIMARY_REGION=\"r3\" REGIONS=\"r3,r4\" */)"))
" PARTITION `p2` VALUES LESS THAN (10000))"))
checkExistTableBundlesInPD(c, s.dom, "test", "tp2")

// test recover after police drop
Expand All @@ -2217,10 +2217,10 @@ func (s *testDBSuite6) TestRecoverTableWithPlacementPolicy(c *C) {
tk.MustExec("flashback table tp2 to tp3")
tk.MustQuery("show create table tp3").Check(testkit.Rows("tp3 CREATE TABLE `tp3` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PRIMARY_REGION=\"r1\" REGIONS=\"r1,r2\" */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" +
"PARTITION BY RANGE (`id`)\n" +
"(PARTITION `p0` VALUES LESS THAN (100) /*T![placement] PRIMARY_REGION=\"r2\" REGIONS=\"r2,r3\" */,\n" +
"(PARTITION `p0` VALUES LESS THAN (100),\n" +
" PARTITION `p1` VALUES LESS THAN (1000),\n" +
" PARTITION `p2` VALUES LESS THAN (10000) /*T![placement] PRIMARY_REGION=\"r3\" REGIONS=\"r3,r4\" */)"))
" PARTITION `p2` VALUES LESS THAN (10000))"))
checkExistTableBundlesInPD(c, s.dom, "test", "tp3")
}
36 changes: 28 additions & 8 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,14 +378,8 @@ func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in
job.Args[checkFlagIndexInJobArgs] = recoverTableCheckFlagDisableGC
}

bundles, err := placement.NewFullTableBundles(t, tblInfo)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

// Send the placement bundle to PD.
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles)
// Clear all placement when recover
err = clearTablePlacementAndBundles(tblInfo)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
Expand Down Expand Up @@ -465,6 +459,32 @@ func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in
return ver, nil
}

func clearTablePlacementAndBundles(tblInfo *model.TableInfo) error {
var bundles []*placement.Bundle
if tblInfo.PlacementPolicyRef != nil || tblInfo.DirectPlacementOpts != nil {
tblInfo.PlacementPolicyRef = nil
tblInfo.DirectPlacementOpts = nil
bundles = append(bundles, placement.NewBundle(tblInfo.ID))
}

if tblInfo.Partition != nil {
for i := range tblInfo.Partition.Definitions {
par := &tblInfo.Partition.Definitions[i]
if par.PlacementPolicyRef != nil || par.DirectPlacementOpts != nil {
par.PlacementPolicyRef = nil
par.DirectPlacementOpts = nil
bundles = append(bundles, placement.NewBundle(par.ID))
}
}
}

if len(bundles) == 0 {
return nil
}

return infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles)
}

// mockRecoverTableCommitErrOnce uses to make sure
// `mockRecoverTableCommitErr` only mock error once.
var mockRecoverTableCommitErrOnce uint32
Expand Down
42 changes: 0 additions & 42 deletions executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,10 +621,6 @@ func (e *DDLExec) executeRecoverTable(s *ast.RecoverTableStmt) error {
return err
}

if tblInfo, err = recoverTablePlacement(m, tblInfo); err != nil {
return err
}

recoverInfo := &ddl.RecoverInfo{
SchemaID: job.SchemaID,
TableInfo: tblInfo,
Expand All @@ -639,40 +635,6 @@ func (e *DDLExec) executeRecoverTable(s *ast.RecoverTableStmt) error {
return err
}

// recoverTablePlacement is used when recover/flashback table.
// It will replace the placement policy of table with the direct options because the original policy may be deleted
func recoverTablePlacement(snapshotMeta *meta.Meta, tblInfo *model.TableInfo) (*model.TableInfo, error) {
if ref := tblInfo.PlacementPolicyRef; ref != nil {
policy, err := snapshotMeta.GetPolicy(ref.ID)
if err != nil {
return nil, errors.Trace(err)
}

tblInfo.PlacementPolicyRef = nil
tblInfo.DirectPlacementOpts = policy.PlacementSettings
}

if tblInfo.Partition != nil {
for idx := range tblInfo.Partition.Definitions {
def := &tblInfo.Partition.Definitions[idx]
ref := def.PlacementPolicyRef
if ref == nil {
continue
}

policy, err := snapshotMeta.GetPolicy(ref.ID)
if err != nil {
return nil, errors.Trace(err)
}

def.PlacementPolicyRef = nil
def.DirectPlacementOpts = policy.PlacementSettings
}
}

return tblInfo, nil
}

func (e *DDLExec) getRecoverTableByJobID(s *ast.RecoverTableStmt, t *meta.Meta, dom *domain.Domain) (*model.Job, *model.TableInfo, error) {
job, err := t.GetHistoryDDLJob(s.JobID)
if err != nil {
Expand Down Expand Up @@ -799,10 +761,6 @@ func (e *DDLExec) executeFlashbackTable(s *ast.FlashBackTableStmt) error {
return err
}

if tblInfo, err = recoverTablePlacement(m, tblInfo); err != nil {
return err
}

recoverInfo := &ddl.RecoverInfo{
SchemaID: job.SchemaID,
TableInfo: tblInfo,
Expand Down